From e7c70ecb9902073bb78539250a5e28adc470025b Mon Sep 17 00:00:00 2001 From: saurav-malani Date: Tue, 4 Apr 2023 12:45:40 +0530 Subject: [PATCH] addressed review comment --- enterprise/suppress-user/factory.go | 80 +++++++++++++++--------- enterprise/suppress-user/factory_test.go | 18 ------ 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/enterprise/suppress-user/factory.go b/enterprise/suppress-user/factory.go index 4ba6f0e0ab4..9b45c5dd3c0 100644 --- a/enterprise/suppress-user/factory.go +++ b/enterprise/suppress-user/factory.go @@ -1,5 +1,18 @@ package suppression +/* +The suppression package provides functionality to manage user suppression lists in a server application. +This package includes a rudder-server suppression component that has been modified to make use of the +suppression-backup-service in order to get the latest and complete user suppression list, instead of +syncing with the data-regulation-service. + +This modification drastically reduces the suppression sync time at the gateway, ensuring that the gateway +starts only once the latest user suppressions are available. The package also includes functionality to +asynchronously retrieve the full suppression list in a separate badgerdb repository, as it might take some time. +Once the full list is available in the badgerdb, the package provides functionality to swap the old badgerdb (with latest users) +with the new badgerdb (with all users). +*/ + import ( "context" "fmt" @@ -8,7 +21,6 @@ import ( "os" "path" "path/filepath" - "sync" "time" "github.com/rudderlabs/rudder-go-kit/config" @@ -46,19 +58,24 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend useBadgerDB := config.GetBool("BackendConfig.Regulations.useBadgerDB", true) if useBadgerDB { - identity := backendConfig.Identity() + identifier := backendConfig.Identity() fullSuppressionPath, latestSuppressionPath, err := getRepoPath() if err != nil { return nil, fmt.Errorf("could not get repo path: %w", err) } - if !alreadySeeded(fullSuppressionPath) && config.IsSet("SUPPRESS_USER_BACKUP_SERVICE_URL") { - repo := &RepoSwitcher{ - mu: sync.RWMutex{}, - } + if !alreadySynced(fullSuppressionPath) && config.IsSet("SUPPRESS_USER_BACKUP_SERVICE_URL") { + _ = os.RemoveAll(fullSuppressionPath) + _ = os.RemoveAll(latestSuppressionPath) + // First starting a repository seeded with the latest data which is faster to load - latestSyncer, latestRepo, err := m.newSyncerWithBadgerRepo(latestSuppressionPath, latestDataSeed, config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second), identity, pollInterval) + latestSyncer, latestRepo, err := m.newSyncerWithBadgerRepo( + latestSuppressionPath, + latestDataSeed, + config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second), + identifier, + pollInterval) if err != nil { return nil, err } @@ -68,32 +85,35 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend latestSyncer.SyncLoop(subCtx) err = latestRepo.Stop() if err != nil { - m.Log.Error("Latest Sync failed: could not stop repo: %w", err) + m.Log.Warnf("Latest Sync failed: could not stop repo: %w", err) } err = os.RemoveAll(latestSuppressionPath) if err != nil { - m.Log.Error("Latest Sync failed: could not remove repo: %w", err) + m.Log.Errorf("Latest Sync failed: could not remove repo: %w", err) } }) - repo.Repository = latestRepo + + repo := &RepoSwitcher{Repository: latestRepo} rruntime.Go(func() { var fullSyncer *Syncer var fullRepo Repository var err error - m.retry(ctx, + m.retryIndefinitely(ctx, func() error { - fullSyncer, fullRepo, err = m.newSyncerWithBadgerRepo(fullSuppressionPath, fullDataSeed, 0, identity, pollInterval) + fullSyncer, fullRepo, err = m.newSyncerWithBadgerRepo(fullSuppressionPath, fullDataSeed, 0, identifier, pollInterval) return err }, 5*time.Second) - m.retry(ctx, + m.retryIndefinitely(ctx, func() error { return fullSyncer.Sync(ctx) }, 5*time.Second) - _, err = os.Create(filepath.Join(fullSuppressionPath, model.SyncDoneMarker)) - if err != nil { - m.Log.Error("Could not create sync done marker: %w", err) - } + + m.retryIndefinitely(ctx, func() error { + _, err = os.Create(filepath.Join(fullSuppressionPath, model.SyncDoneMarker)) + return err + }, 1*time.Second) + repo.Switch(fullRepo) latestSyncCancel() fullSyncer.SyncLoop(ctx) @@ -101,13 +121,16 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend }) return newHandler(repo, m.Log), nil } else { - syncer, fullRepo, err := m.newSyncerWithBadgerRepo(fullSuppressionPath, nil, 0, identity, pollInterval) + syncer, fullRepo, err := m.newSyncerWithBadgerRepo(fullSuppressionPath, nil, 0, identifier, pollInterval) if err != nil { return nil, err } rruntime.Go(func() { syncer.SyncLoop(ctx) - _ = fullRepo.Stop() + err = fullRepo.Stop() + if err != nil { + m.Log.Warnf("Full Sync failed: could not stop repo: %w", err) + } }) return newHandler(fullRepo, m.Log), nil } @@ -128,6 +151,9 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend rruntime.Go(func() { syncer.SyncLoop(ctx) err = memoryRepo.Stop() + if err != nil { + m.Log.Warnf("Sync failed: could not stop repo: %w", err) + } }) h := newHandler(memoryRepo, m.Log) @@ -135,20 +161,12 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend } } -func alreadySeeded(repoPath string) bool { - _, err := os.Stat(repoPath) - if os.IsNotExist(err) { - return false - } - _, err = os.Stat(path.Join(repoPath, model.SyncDoneMarker)) - if os.IsNotExist(err) { - os.RemoveAll(repoPath) - return false - } - return true +func alreadySynced(repoPath string) bool { + _, err := os.Stat(path.Join(repoPath, model.SyncDoneMarker)) + return !os.IsNotExist(err) } -func (m *Factory) retry(ctx context.Context, f func() error, wait time.Duration) { +func (m *Factory) retryIndefinitely(ctx context.Context, f func() error, wait time.Duration) { var err error for { err = f() diff --git a/enterprise/suppress-user/factory_test.go b/enterprise/suppress-user/factory_test.go index 3e05b258fb0..d43c1015a51 100644 --- a/enterprise/suppress-user/factory_test.go +++ b/enterprise/suppress-user/factory_test.go @@ -20,10 +20,6 @@ import ( "github.com/stretchr/testify/require" ) -const ( - namespaceID = "spaghetti" -) - func TestSuppressionSetup(t *testing.T) { srv := httptest.NewServer(httpHandler(t)) defer t.Cleanup(srv.Close) @@ -87,7 +83,6 @@ func httpHandler(t *testing.T) http.Handler { t.Helper() srvMux := mux.NewRouter() srvMux.HandleFunc("/workspaceConfig", getSingleTenantWorkspaceConfig).Methods(http.MethodGet) - srvMux.HandleFunc("/data-plane/v1/namespaces/{namespace_id}/config", getMultiTenantNamespaceConfig).Methods(http.MethodGet) srvMux.HandleFunc("/full-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/full-export") }).Methods(http.MethodGet) srvMux.HandleFunc("/latest-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/latest-export") }).Methods(http.MethodGet) srvMux.HandleFunc("/dataplane/workspaces/{workspace_id}/regulations/suppressions", getSuppressionFromManager).Methods(http.MethodGet) @@ -113,19 +108,6 @@ func getSingleTenantWorkspaceConfig(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write(body) } -func getMultiTenantNamespaceConfig(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "application/json") - config := map[string]backendconfig.ConfigT{namespaceID: { - WorkspaceID: "reg-test-workspaceId", - }} - body, err := json.Marshal(config) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - _, _ = w.Write(body) -} - func getSuppressionFromManager(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") s := suppressionsResponse{