Skip to content

Commit

Permalink
refactor(processor): stop using global variables
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Jan 18, 2023
1 parent c2994fb commit 15e9e5c
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 355 deletions.
6 changes: 2 additions & 4 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,12 @@ func initJobsDB() {
router.InitRouterAdmin()
batchrouter.Init()
batchrouter.Init2()
processor.Init()
Init()
}

func TestDynamicClusterManager(t *testing.T) {
initJobsDB()

processor.SetFeaturesRetryAttempts(0)

mockCtrl := gomock.NewController(t)
mockMTI := mock_tenantstats.NewMockMultiTenantI(mockCtrl)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl)
Expand All @@ -198,7 +195,8 @@ func TestDynamicClusterManager(t *testing.T) {
"batch_rt": &jobsdb.MultiTenantLegacy{HandleT: brtDB},
})

processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), rsources.NewNoOpService())
processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), rsources.NewNoOpService(),
processor.WithFeaturesRetryMaxAttempts(0))
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
Expand Down
24 changes: 17 additions & 7 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type LifecycleManager struct {
HandleT *HandleT
Handle *Handle
mainCtx context.Context
currentCancel context.CancelFunc
waitGroup interface{ Wait() }
Expand All @@ -38,10 +38,10 @@ type LifecycleManager struct {
// are assuming that the DBs will be up.
func (proc *LifecycleManager) Start() error {
if proc.Transformer != nil {
proc.HandleT.transformer = proc.Transformer
proc.Handle.transformer = proc.Transformer
}

proc.HandleT.Setup(
proc.Handle.Setup(
proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.errDB,
proc.clearDB, proc.ReportingI, proc.MultitenantStats, proc.transientSources, proc.fileuploader, proc.rsourcesService,
)
Expand All @@ -55,8 +55,8 @@ func (proc *LifecycleManager) Start() error {
wg.Add(1)
go func() {
defer wg.Done()
if err := proc.HandleT.Start(currentCtx); err != nil {
proc.HandleT.logger.Errorf("Error starting processor: %v", err)
if err := proc.Handle.Start(currentCtx); err != nil {
proc.Handle.logger.Errorf("Error starting processor: %v", err)
}
}()
return nil
Expand All @@ -65,17 +65,24 @@ func (proc *LifecycleManager) Start() error {
// Stop stops the processor, this is a blocking call.
func (proc *LifecycleManager) Stop() {
proc.currentCancel()
proc.HandleT.Shutdown()
proc.Handle.Shutdown()
proc.waitGroup.Wait()
}

func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) {
return func(l *LifecycleManager) {
l.Handle.config.featuresRetryMaxAttempts = maxAttempts
}
}

// New creates a new Processor instance
func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb *jobsdb.HandleT,
tenantDB multitenant.MultiTenantI, reporting types.ReportingI, transientSources transientsource.Service, fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
opts ...func(l *LifecycleManager),
) *LifecycleManager {
proc := &LifecycleManager{
HandleT: &HandleT{transformer: transformer.NewTransformer()},
Handle: NewHandle(transformer.NewTransformer()),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand All @@ -89,5 +96,8 @@ func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb *jobsdb.Ha
fileuploader: fileuploader,
rsourcesService: rsourcesService,
}
for _, opt := range opts {
opt(proc)
}
return proc
}
38 changes: 27 additions & 11 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/services/fileuploader"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/rudderlabs/rudder-server/services/archiver"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/pubsub"
)

var (
Expand Down Expand Up @@ -126,7 +128,6 @@ func initJobsDB() {
jobsdb.Init2()
jobsdb.Init3()
archiver.Init()
Init()
}

func genJobs(customVal string, jobCount, eventsPerJob int) []*jobsdb.JobT {
Expand All @@ -145,16 +146,12 @@ func genJobs(customVal string, jobCount, eventsPerJob int) []*jobsdb.JobT {
}

func TestProcessorManager(t *testing.T) {
temp := isUnLocked
defer func() { isUnLocked = temp }()
initJobsDB()
mockCtrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl)
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
mockRsourcesService := rsources.NewMockJobService(mockCtrl)

SetFeaturesRetryAttempts(0)
enablePipelining = false
RegisterTestingT(t)
triggerAddNewDS := make(chan time.Time)
maxDSSize := 10
Expand Down Expand Up @@ -201,7 +198,11 @@ func TestProcessorManager(t *testing.T) {
"batch_rt": &jobsdb.MultiTenantLegacy{HandleT: brtDB},
},
}
processor := New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mtStat, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), mockRsourcesService)
processor := New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mtStat, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), mockRsourcesService,
func(m *LifecycleManager) {
m.Handle.config.enablePipelining = false
m.Handle.config.featuresRetryMaxAttempts = 0
})

t.Run("jobs are already there in GW DB before processor starts", func(t *testing.T) {
require.NoError(t, gwDB.Start())
Expand All @@ -212,14 +213,21 @@ func TestProcessorManager(t *testing.T) {
defer brtDB.Stop()
require.NoError(t, errDB.Start())
defer errDB.Stop()
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(
func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ch := make(chan pubsub.DataEvent, 1)
ch <- pubsub.DataEvent{Data: map[string]backendconfig.ConfigT{sampleWorkspaceID: sampleBackendConfig}, Topic: string(topic)}
close(ch)
return ch
},
)
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
processor.HandleT.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
processor.BackendConfig = mockBackendConfig
processor.HandleT.transformer = mockTransformer
processor.Handle.transformer = mockTransformer
require.NoError(t, processor.Start())
defer processor.Stop()
Eventually(func() int {
Expand All @@ -242,10 +250,18 @@ func TestProcessorManager(t *testing.T) {
defer brtDB.Stop()
require.NoError(t, errDB.Start())
defer errDB.Stop()
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(
func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ch := make(chan pubsub.DataEvent, 1)
ch <- pubsub.DataEvent{Data: map[string]backendconfig.ConfigT{sampleWorkspaceID: sampleBackendConfig}, Topic: string(topic)}
close(ch)
return ch
},
)

mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
processor.HandleT.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)

Expand Down
Loading

0 comments on commit 15e9e5c

Please sign in to comment.