Skip to content

Commit

Permalink
feat: rudder-server modification to use suppression-backup service. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani committed Apr 5, 2023
1 parent c7210fa commit daf3e26
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 37 deletions.
217 changes: 191 additions & 26 deletions enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
package suppression

/*
The suppression package provides functionality to manage user suppression lists in a server application.
This package includes a rudder-server suppression component that has been modified to make use of the
suppression-backup-service in order to get the latest and complete user suppression list, instead of
syncing with the data-regulation-service.
This modification drastically reduces the suppression sync time at the gateway, ensuring that the gateway
starts only once the latest user suppressions are available. The package also includes functionality to
asynchronously retrieve the full suppression list in a separate badgerdb repository, as it might take some time.
Once the full list is available in the badgerdb, the package provides functionality to swap the old badgerdb (with latest users)
with the new badgerdb (with all users).
*/

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -26,57 +43,205 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
if m.Log == nil {
m.Log = logger.NewLogger().Child("enterprise").Child("suppress-user")
}

if m.EnterpriseToken == "" {
m.Log.Info("Suppress User feature is enterprise only")
return &NOOP{}, nil
}

m.Log.Info("Setting up Suppress User Feature")

backendConfig.WaitForConfig(ctx)
var repository Repository
if config.GetBool("BackendConfig.Regulations.useBadgerDB", true) {
tmpDir, err := misc.CreateTMPDIR()

var pollInterval time.Duration
config.RegisterDurationConfigVariable(300, &pollInterval, true, time.Second, "BackendConfig.Regulations.pollInterval")

useBadgerDB := config.GetBool("BackendConfig.Regulations.useBadgerDB", true)
if useBadgerDB {
identifier := backendConfig.Identity()

fullSuppressionPath, latestSuppressionPath, err := getRepoPath()
if err != nil {
return nil, fmt.Errorf("could not create tmp dir: %w", err)
return nil, fmt.Errorf("could not get repo path: %w", err)
}
path := path.Join(tmpDir, "suppression")

// TODO: implement seeder source, to retrieve the initial state from a persisted backup
var seederSource func() (io.Reader, error)
if !alreadySynced(fullSuppressionPath) && config.IsSet("SUPPRESS_USER_BACKUP_SERVICE_URL") {
_ = os.RemoveAll(fullSuppressionPath)
_ = os.RemoveAll(latestSuppressionPath)

repository, err = NewBadgerRepository(
path,
m.Log,
WithSeederSource(seederSource),
WithMaxSeedWait(config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second)))
if err != nil {
return nil, fmt.Errorf("could not create badger repository: %w", err)
// First starting a repository seeded with the latest data which is faster to load
latestSyncer, latestRepo, err := m.newSyncerWithBadgerRepo(
latestSuppressionPath,
latestDataSeed,
config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second),
identifier,
pollInterval)
if err != nil {
return nil, err
}

subCtx, latestSyncCancel := context.WithCancel(ctx)
rruntime.Go(func() {
latestSyncer.SyncLoop(subCtx)
err = latestRepo.Stop()
if err != nil {
m.Log.Warnf("Latest Sync failed: could not stop repo: %w", err)
}
err = os.RemoveAll(latestSuppressionPath)
if err != nil {
m.Log.Errorf("Latest Sync failed: could not remove repo: %w", err)
}
})

repo := &RepoSwitcher{Repository: latestRepo}
rruntime.Go(func() {
var fullSyncer *Syncer
var fullRepo Repository
var err error

m.retryIndefinitely(ctx,
func() error {
fullSyncer, fullRepo, err = m.newSyncerWithBadgerRepo(fullSuppressionPath, fullDataSeed, 0, identifier, pollInterval)
return err
}, 5*time.Second)

m.retryIndefinitely(ctx,
func() error { return fullSyncer.Sync(ctx) },
5*time.Second)

_, err = os.Create(filepath.Join(fullSuppressionPath, model.SyncDoneMarker))
if err != nil {
m.Log.Errorf("Could not create sync done marker: %w", err)
}
repo.Switch(fullRepo)
latestSyncCancel()
fullSyncer.SyncLoop(ctx)
err = fullRepo.Stop()
if err != nil {
m.Log.Warnf("Full Sync failed: could not stop repo: %w", err)
}
})
return newHandler(repo, m.Log), nil
} else {
syncer, fullRepo, err := m.newSyncerWithBadgerRepo(fullSuppressionPath, nil, 0, identifier, pollInterval)
if err != nil {
return nil, err
}
rruntime.Go(func() {
syncer.SyncLoop(ctx)
err = fullRepo.Stop()
if err != nil {
m.Log.Warnf("could not stop full sync repo: %w", err)
}
})
return newHandler(fullRepo, m.Log), nil
}
} else {
repository = NewMemoryRepository(m.Log)
memoryRepo := NewMemoryRepository(m.Log)
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
memoryRepo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
if err != nil {
return nil, err
}
rruntime.Go(func() {
syncer.SyncLoop(ctx)
err = memoryRepo.Stop()
if err != nil {
m.Log.Warnf("Sync failed: could not stop repo: %w", err)
}
})
h := newHandler(memoryRepo, m.Log)

return h, nil
}
}

var pollInterval time.Duration
config.RegisterDurationConfigVariable(300, &pollInterval, true, time.Second, "BackendConfig.Regulations.pollInterval")
func alreadySynced(repoPath string) bool {
_, err := os.Stat(path.Join(repoPath, model.SyncDoneMarker))
return err == nil
}

func (m *Factory) retryIndefinitely(ctx context.Context, f func() error, wait time.Duration) {
var err error
for {
err = f()
if err == nil {
return
}
m.Log.Errorf("retry failed: %v", err)
select {
case <-ctx.Done():
return
case <-time.After(wait):
}
}
}

func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval time.Duration) (*Syncer, Repository, error) {
repo, err := NewBadgerRepository(
repoPath,
m.Log,
WithSeederSource(seederSource),
WithMaxSeedWait(maxSeedWaitTime),
)
if err != nil {
return nil, nil, fmt.Errorf("could not create badger repository: %w", err)
}
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
repository,
identity,
repo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
if err != nil {
return nil, err
return nil, nil, err
}
return syncer, repo, nil
}

func getRepoPath() (fullSuppressionPath, latestSuppressionPath string, err error) {
tmpDir, err := misc.CreateTMPDIR()
if err != nil {
return "", "", fmt.Errorf("could not create tmp dir: %w", err)
}
fullSuppressionPath = path.Join(tmpDir, "fullSuppression")
latestSuppressionPath = path.Join(tmpDir, "latestSuppression")
return
}

h := newHandler(repository, m.Log)
func latestDataSeed() (io.ReadCloser, error) {
return seederSource("latest-export")
}

rruntime.Go(func() {
syncer.SyncLoop(ctx)
_ = repository.Stop()
})
func fullDataSeed() (io.ReadCloser, error) {
return seederSource("full-export")
}

return h, nil
func seederSource(endpoint string) (io.ReadCloser, error) {
client := http.Client{}
baseURL := config.GetString("SUPPRESS_USER_BACKUP_SERVICE_URL", "https://api.rudderstack.com")
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", baseURL, endpoint), http.NoBody)
if err != nil {
return nil, fmt.Errorf("could not create request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("could not perform request: %w", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// close body afterwards.
return resp.Body, nil
}
123 changes: 123 additions & 0 deletions enterprise/suppress-user/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package suppression

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/stretchr/testify/require"
)

func TestSuppressionSetup(t *testing.T) {
srv := httptest.NewServer(httpHandler(t))
defer t.Cleanup(srv.Close)
t.Setenv("WORKSPACE_TOKEN", "216Co97d9So9TkqphM0cxBzRxc3")
t.Setenv("CONFIG_BACKEND_URL", srv.URL)
t.Setenv("SUPPRESS_USER_BACKUP_SERVICE_URL", srv.URL)
t.Setenv("SUPPRESS_BACKUP_URL", srv.URL)
t.Setenv("SUPPRESS_USER_BACKEND_URL", srv.URL)
dir, err := os.MkdirTemp("/tmp", "rudder-server")
require.NoError(t, err)
t.Setenv("RUDDER_TMPDIR", dir)
defer os.RemoveAll(dir)
config.Set("Diagnostics.enableDiagnostics", false)
admin.Init()
misc.Init()
diagnostics.Init()
backendconfig.Init()

require.NoError(t, backendconfig.Setup(nil))
defer backendconfig.DefaultBackendConfig.Stop()
backendconfig.DefaultBackendConfig.StartWithIDs(context.TODO(), "")

t.Run(
"should setup badgerdb and syncer successfully after getting suppression from backup service",
func(t *testing.T) {
f := Factory{
EnterpriseToken: "token",
Log: logger.NOP,
}
ctx, cancel := context.WithCancel(context.Background())
h, err := f.Setup(ctx, backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
v := h.IsSuppressedUser("workspace-1", "user-1", "src-1")
require.True(t, v)
require.Eventually(t, func() bool {
return h.IsSuppressedUser("workspace-2", "user-2", "src-4")
}, time.Second*15, time.Millisecond*100, "User should be suppressed")
cancel()
time.Sleep(time.Second * 2)
h2, err := f.Setup(context.Background(), backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
require.True(t, h2.IsSuppressedUser("workspace-2", "user-2", "src-4"))
},
)
t.Run(
"should setup in-memory suppression db",
func(t *testing.T) {
f := Factory{
EnterpriseToken: "token",
Log: logger.NOP,
}
t.Setenv("RSERVER_BACKEND_CONFIG_REGULATIONS_USE_BADGER_DB", "false")
h, err := f.Setup(context.Background(), backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
require.False(t, h.IsSuppressedUser("workspace-1", "user-1", "src-1"))
},
)
}

func httpHandler(t *testing.T) http.Handler {
t.Helper()
srvMux := mux.NewRouter()
srvMux.HandleFunc("/workspaceConfig", getSingleTenantWorkspaceConfig).Methods(http.MethodGet)
srvMux.HandleFunc("/full-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/full-export") }).Methods(http.MethodGet)
srvMux.HandleFunc("/latest-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/latest-export") }).Methods(http.MethodGet)
srvMux.HandleFunc("/dataplane/workspaces/{workspace_id}/regulations/suppressions", getSuppressionFromManager).Methods(http.MethodGet)
srvMux.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
next.ServeHTTP(w, req)
})
})

return srvMux
}

func getSingleTenantWorkspaceConfig(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
config := backendconfig.ConfigT{
WorkspaceID: "reg-test-workspaceId",
}
body, err := json.Marshal(config)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = w.Write(body)
}

func getSuppressionFromManager(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
s := suppressionsResponse{
Items: []model.Suppression{},
Token: "_token_",
}
body, err := json.Marshal(s)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = w.Write(body)
}
Loading

0 comments on commit daf3e26

Please sign in to comment.