Skip to content

Commit

Permalink
refactor factory code
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani committed Mar 30, 2023
1 parent 86402e6 commit 989c896
Showing 1 changed file with 39 additions and 66 deletions.
105 changes: 39 additions & 66 deletions enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -24,118 +25,90 @@ type Factory struct {
Log logger.Logger
}

func (m *Factory) NewSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), identity identity.Identifier, pollInterval time.Duration) (*Syncer, Repository, error) {
repo, err := NewBadgerRepository(
repoPath,
m.Log,
WithSeederSource(seederSource))
if err != nil {
return nil, nil, fmt.Errorf("could not create badger repository: %w", err)
}
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
identity,
repo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
if err != nil {
return nil, nil, err
}
return syncer, repo, nil
}

// Setup initializes the user suppression feature
func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.BackendConfig) (types.UserSuppression, error) {
if m.Log == nil {
m.Log = logger.NewLogger().Child("enterprise").Child("suppress-user")
}

if m.EnterpriseToken == "" {
m.Log.Info("Suppress User feature is enterprise only")
return &NOOP{}, nil
}

m.Log.Info("Setting up Suppress User Feature")

backendConfig.WaitForConfig(ctx)

var pollInterval time.Duration
config.RegisterDurationConfigVariable(300, &pollInterval, true, time.Second, "BackendConfig.Regulations.pollInterval")

var latestRepo Repository
// set up the repository
var repo *RepoSwitcher
if config.GetBool("BackendConfig.Regulations.useBadgerDB", true) {
useBadgerDB := config.GetBool("BackendConfig.Regulations.useBadgerDB", true)
if useBadgerDB {
fullSuppressionPath, latestSuppressionPath, err := getRepoPath()
if err != nil {
return nil, fmt.Errorf("could not get repo path: %w", err)
}

identity := backendConfig.Identity()

if _, err = os.Stat(fullSuppressionPath); err != nil && os.IsNotExist(err) {
// start NewBadergerRepository with latestSuppressionPath
latestRepo, err = NewBadgerRepository(
latestSuppressionPath,
m.Log,
WithSeederSource(latestDataSeed))
if err != nil {
return nil, fmt.Errorf("could not create badger repository: %w", err)
}
// start syncLoop of latest repo
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
latestRepo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
syncer, latestRepo, err := m.NewSyncerWithBadgerRepo(latestSuppressionPath, latestDataSeed, identity, pollInterval)
if err != nil {
return nil, err
}

subCtx, latestSyncCancel := context.WithCancel(ctx)
rruntime.Go(func() {
syncer.SyncLoop(subCtx)
err = latestRepo.Stop()
defer latestSyncCancel()
})
tmp := RepoSwitcher{
repo = &RepoSwitcher{
Repository: latestRepo,
mu: sync.RWMutex{},
}
repo = &tmp
// in a go routine
rruntime.Go(func() {
// create NewBadgerRepository with fullSuppressionPath
fullRepo, err := NewBadgerRepository(
fullSuppressionPath,
m.Log,
WithSeederSource(fullDataSeed))
if err != nil {
m.Log.Error("Complete Synce failed: could not create badger repository: %w", err)
return
}
// create syncer with fullRepo
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
fullRepo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
syncer, fullRepo, err := m.NewSyncerWithBadgerRepo(fullSuppressionPath, fullDataSeed, identity, pollInterval)
if err != nil {
m.Log.Error("Complete Synce failed: could not create syncer: %w", err)
return
}
// complete it's 1st sync
if err = syncer.Sync(ctx); err != nil {
m.Log.Error("Complete Synce failed: could not sync: %w", err)
}
// stop latest repo sync loop
latestSyncCancel()
// switch repo
repo.Switch(fullRepo)
// start syncLoop of full repo
syncer.SyncLoop(ctx)
_ = fullRepo.Stop()
})
} else {
// start NewBadgerRepository with fullSuppressionPath
fullRepo, err := NewBadgerRepository(
fullSuppressionPath,
m.Log)
if err != nil {
m.Log.Error("Complete Synce failed: could not create badger repository: %w", err)
return nil, err
}

// start sync of this repo
// start syncLoop of latest repo
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
fullRepo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
syncer, fullRepo, err := m.NewSyncerWithBadgerRepo(fullSuppressionPath, nil, identity, pollInterval)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 989c896

Please sign in to comment.