Skip to content

Commit

Permalink
feat: archive jobs to object storage (#3721)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Aug 17, 2023
1 parent 264c52a commit c65ded3
Show file tree
Hide file tree
Showing 17 changed files with 1,811 additions and 17 deletions.
25 changes: 25 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/archiver"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway"
gwThrottler "github.com/rudderlabs/rudder-server/gateway/throttler"
Expand Down Expand Up @@ -200,6 +202,20 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
)
defer archivalDB.Close()

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
Expand Down Expand Up @@ -229,6 +245,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
errDBForRead,
errDBForWrite,
schemaDB,
archivalDB,
reportingI,
transientSources,
fileUploaderProvider,
Expand Down Expand Up @@ -272,9 +289,17 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
BatchRouterDB: batchRouterDB,
ErrorDB: errDBForRead,
EventSchemaDB: schemaDB,
ArchivalDB: archivalDB,
Processor: proc,
Router: rt,
SchemaForwarder: schemaForwarder,
Archiver: archiver.New(
archivalDB,
fileUploaderProvider,
config,
stats.Default,
archiver.WithAdaptiveLimit(adaptiveLimit),
),
}

rateLimiter, err := gwThrottler.New(stats.Default)
Expand Down
24 changes: 24 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/archiver"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/router/throttler"
schema_forwarder "github.com/rudderlabs/rudder-server/schema-forwarder"
Expand Down Expand Up @@ -193,6 +194,20 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
)
defer archivalDB.Close()

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
Expand Down Expand Up @@ -222,6 +237,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
errDBForRead,
errDBForWrite,
schemaDB,
archivalDB,
reportingI,
transientSources,
fileUploaderProvider,
Expand Down Expand Up @@ -267,8 +283,16 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
ErrorDB: errDBForRead,
SchemaForwarder: schemaForwarder,
EventSchemaDB: schemaDB,
ArchivalDB: archivalDB,
Processor: p,
Router: rt,
Archiver: archiver.New(
archivalDB,
fileUploaderProvider,
config.Default,
stats.Default,
archiver.WithAdaptiveLimit(adaptiveLimit),
),
}

g.Go(func() error {
Expand Down
12 changes: 12 additions & 0 deletions app/cluster/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ type Dynamic struct {
BatchRouterDB lifecycle
ErrorDB lifecycle
EventSchemaDB lifecycle
ArchivalDB lifecycle

Processor lifecycle
Router lifecycle

SchemaForwarder lifecycle
Archiver lifecycle

currentMode servermode.Mode
currentWorkspaceIDs string
Expand Down Expand Up @@ -164,6 +166,9 @@ func (d *Dynamic) start() error {
if err := d.EventSchemaDB.Start(); err != nil {
return fmt.Errorf("event schemas db start: %w", err)
}
if err := d.ArchivalDB.Start(); err != nil {
return fmt.Errorf("archival db start: %w", err)
}
if err := d.RouterDB.Start(); err != nil {
return fmt.Errorf("router db start: %w", err)
}
Expand All @@ -173,6 +178,9 @@ func (d *Dynamic) start() error {
if err := d.Processor.Start(); err != nil {
return fmt.Errorf("processor start: %w", err)
}
if err := d.Archiver.Start(); err != nil {
return fmt.Errorf("archiver start: %w", err)
}
if err := d.SchemaForwarder.Start(); err != nil {
return fmt.Errorf("jobs forwarder start: %w", err)
}
Expand All @@ -195,6 +203,8 @@ func (d *Dynamic) stop() {
d.logger.Debug("Router stopped")
d.SchemaForwarder.Stop()
d.logger.Debug("JobsForwarder stopped")
d.Archiver.Stop()
d.logger.Debug("Archiver stopped")
d.Processor.Stop()
d.logger.Debug("Processor stopped")

Expand All @@ -204,6 +214,8 @@ func (d *Dynamic) stop() {
d.logger.Debug("RouterDB stopped")
d.EventSchemaDB.Stop()
d.logger.Debug("EventSchemasDB stopped")
d.ArchivalDB.Stop()
d.logger.Debug("ArchivalDB stopped")
d.GatewayDB.Stop()
d.logger.Debug("GatewayDB stopped")
d.ErrorDB.Stop()
Expand Down
4 changes: 4 additions & 0 deletions app/cluster/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func TestDynamicCluster(t *testing.T) {
errorDB := &mockLifecycle{status: "", callCount: &callCount}
schemaForwarder := mockjobsforwarder.NewMockForwarder(gomock.NewController(t))
eschDB := &mockLifecycle{status: "", callCount: &callCount}
archDB := &mockLifecycle{status: "", callCount: &callCount}
archiver := &mockLifecycle{status: "", callCount: &callCount}

processor := &mockLifecycle{status: "", callCount: &callCount}
router := &mockLifecycle{status: "", callCount: &callCount}
Expand All @@ -91,10 +93,12 @@ func TestDynamicCluster(t *testing.T) {
BatchRouterDB: batchRouterDB,
ErrorDB: errorDB,
EventSchemaDB: eschDB,
ArchivalDB: archDB,

Processor: processor,
Router: router,
SchemaForwarder: schemaForwarder,
Archiver: archiver,

BackendConfig: backendConfig,
}
Expand Down
15 changes: 15 additions & 0 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

arc "github.com/rudderlabs/rudder-server/archiver"
mock_jobs_forwarder "github.com/rudderlabs/rudder-server/mocks/jobs-forwarder"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

Expand All @@ -29,6 +30,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
backendConfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -180,10 +182,15 @@ func TestDynamicClusterManager(t *testing.T) {
defer gwDB.TearDown()
eschDB := jobsdb.NewForReadWrite("esch")
defer eschDB.TearDown()
archiveDB := jobsdb.NewForReadWrite("archive")
defer archiveDB.TearDown()
rtDB := jobsdb.NewForReadWrite("rt")
defer rtDB.TearDown()
brtDB := jobsdb.NewForReadWrite("batch_rt")
defer brtDB.TearDown()

archDB := jobsdb.NewForReadWrite("archival")
defer archDB.TearDown()
readErrDB := jobsdb.NewForRead("proc_error")
defer readErrDB.TearDown()
writeErrDB := jobsdb.NewForWrite("proc_error")
Expand All @@ -204,6 +211,7 @@ func TestDynamicClusterManager(t *testing.T) {
readErrDB,
writeErrDB,
eschDB,
archDB,
&reporting.NOOP{},
transientsource.NewEmptyService(),
fileuploader.NewDefaultProvider(),
Expand Down Expand Up @@ -257,7 +265,14 @@ func TestDynamicClusterManager(t *testing.T) {
BatchRouterDB: brtDB,
ErrorDB: readErrDB,
EventSchemaDB: eschDB,
ArchivalDB: archDB,
SchemaForwarder: schemaForwarder,
Archiver: arc.New(
archiveDB,
nil,
config.Default,
stats.Default,
),

Processor: processor,
Router: router,
Expand Down

0 comments on commit c65ded3

Please sign in to comment.