Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rudder-server modification to use suppression-backup service. #3116

Merged
merged 22 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 191 additions & 26 deletions enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
package suppression

/*
The suppression package provides functionality to manage user suppression lists in a server application.
This package includes a rudder-server suppression component that has been modified to make use of the
suppression-backup-service in order to get the latest and complete user suppression list, instead of
syncing with the data-regulation-service.

This modification drastically reduces the suppression sync time at the gateway, ensuring that the gateway
starts only once the latest user suppressions are available. The package also includes functionality to
asynchronously retrieve the full suppression list in a separate badgerdb repository, as it might take some time.
Once the full list is available in the badgerdb, the package provides functionality to swap the old badgerdb (with latest users)
with the new badgerdb (with all users).
*/

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -26,57 +43,205 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
if m.Log == nil {
m.Log = logger.NewLogger().Child("enterprise").Child("suppress-user")
}

if m.EnterpriseToken == "" {
m.Log.Info("Suppress User feature is enterprise only")
return &NOOP{}, nil
}

m.Log.Info("Setting up Suppress User Feature")

backendConfig.WaitForConfig(ctx)
var repository Repository
if config.GetBool("BackendConfig.Regulations.useBadgerDB", true) {
tmpDir, err := misc.CreateTMPDIR()

var pollInterval time.Duration
config.RegisterDurationConfigVariable(300, &pollInterval, true, time.Second, "BackendConfig.Regulations.pollInterval")

useBadgerDB := config.GetBool("BackendConfig.Regulations.useBadgerDB", true)
if useBadgerDB {
identifier := backendConfig.Identity()

fullSuppressionPath, latestSuppressionPath, err := getRepoPath()
if err != nil {
return nil, fmt.Errorf("could not create tmp dir: %w", err)
return nil, fmt.Errorf("could not get repo path: %w", err)
}
path := path.Join(tmpDir, "suppression")

// TODO: implement seeder source, to retrieve the initial state from a persisted backup
var seederSource func() (io.Reader, error)
if !alreadySynced(fullSuppressionPath) && config.IsSet("SUPPRESS_USER_BACKUP_SERVICE_URL") {
_ = os.RemoveAll(fullSuppressionPath)
_ = os.RemoveAll(latestSuppressionPath)

repository, err = NewBadgerRepository(
path,
m.Log,
WithSeederSource(seederSource),
WithMaxSeedWait(config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second)))
if err != nil {
return nil, fmt.Errorf("could not create badger repository: %w", err)
// First starting a repository seeded with the latest data which is faster to load
latestSyncer, latestRepo, err := m.newSyncerWithBadgerRepo(
latestSuppressionPath,
latestDataSeed,
config.GetDuration("BackendConfig.Regulations.maxSeedWait", 5, time.Second),
identifier,
pollInterval)
if err != nil {
return nil, err
}

subCtx, latestSyncCancel := context.WithCancel(ctx)
rruntime.Go(func() {
latestSyncer.SyncLoop(subCtx)
err = latestRepo.Stop()
if err != nil {
m.Log.Warnf("Latest Sync failed: could not stop repo: %w", err)
}
err = os.RemoveAll(latestSuppressionPath)
if err != nil {
m.Log.Errorf("Latest Sync failed: could not remove repo: %w", err)
}
})

repo := &RepoSwitcher{Repository: latestRepo}
rruntime.Go(func() {
var fullSyncer *Syncer
var fullRepo Repository
var err error

m.retryIndefinitely(ctx,
func() error {
fullSyncer, fullRepo, err = m.newSyncerWithBadgerRepo(fullSuppressionPath, fullDataSeed, 0, identifier, pollInterval)
return err
}, 5*time.Second)

m.retryIndefinitely(ctx,
func() error { return fullSyncer.Sync(ctx) },
5*time.Second)

_, err = os.Create(filepath.Join(fullSuppressionPath, model.SyncDoneMarker))
if err != nil {
saurav-malani marked this conversation as resolved.
Show resolved Hide resolved
m.Log.Errorf("Could not create sync done marker: %w", err)
}
repo.Switch(fullRepo)
latestSyncCancel()
fullSyncer.SyncLoop(ctx)
err = fullRepo.Stop()
if err != nil {
m.Log.Warnf("Full Sync failed: could not stop repo: %w", err)
}
})
return newHandler(repo, m.Log), nil
} else {
syncer, fullRepo, err := m.newSyncerWithBadgerRepo(fullSuppressionPath, nil, 0, identifier, pollInterval)
if err != nil {
return nil, err
}
rruntime.Go(func() {
syncer.SyncLoop(ctx)
err = fullRepo.Stop()
if err != nil {
m.Log.Warnf("could not stop full sync repo: %w", err)
}
})
return newHandler(fullRepo, m.Log), nil
}
} else {
repository = NewMemoryRepository(m.Log)
memoryRepo := NewMemoryRepository(m.Log)
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
memoryRepo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
if err != nil {
return nil, err
}
rruntime.Go(func() {
syncer.SyncLoop(ctx)
err = memoryRepo.Stop()
if err != nil {
m.Log.Warnf("Sync failed: could not stop repo: %w", err)
}
})
h := newHandler(memoryRepo, m.Log)

return h, nil
}
}

var pollInterval time.Duration
config.RegisterDurationConfigVariable(300, &pollInterval, true, time.Second, "BackendConfig.Regulations.pollInterval")
func alreadySynced(repoPath string) bool {
_, err := os.Stat(path.Join(repoPath, model.SyncDoneMarker))
return err == nil
}

func (m *Factory) retryIndefinitely(ctx context.Context, f func() error, wait time.Duration) {
var err error
for {
err = f()
if err == nil {
return
}
m.Log.Errorf("retry failed: %v", err)
Copy link
Collaborator

@fracasula fracasula Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like it would be better to log the error in the anonymous function instead of logging it here. This way we can add some context to the error like:

m.retryIndefinitely(ctx, func() error {
	if _, err = os.Create(filepath.Join(fullSuppressionPath, model.SyncDoneMarker)); err != nil {
		return fmt.Errorf("could not create sync marker: %v", err)
	}
	return nil
}, 1*time.Second)

Also, I'm not sure trying these ops forever without anybody looking at the logs is a good idea.
For example, let's say that it fails forever at the 2nd step (i.e. fullSyncer.Sync(ctx)). How are we going to notice?

cc @atzoum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alerting for sync failures is something we are missing for the whole suppression feature (not just for this scenario). Let's treat alerting in another task

select {
case <-ctx.Done():
return
case <-time.After(wait):
}
}
}

func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval time.Duration) (*Syncer, Repository, error) {
repo, err := NewBadgerRepository(
repoPath,
m.Log,
WithSeederSource(seederSource),
WithMaxSeedWait(maxSeedWaitTime),
)
if err != nil {
return nil, nil, fmt.Errorf("could not create badger repository: %w", err)
}
syncer, err := NewSyncer(
config.GetString("SUPPRESS_USER_BACKEND_URL", "https://api.rudderstack.com"),
backendConfig.Identity(),
repository,
identity,
repo,
WithLogger(m.Log),
WithHttpClient(&http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}),
WithPageSize(config.GetInt("BackendConfig.Regulations.pageSize", 5000)),
WithPollIntervalFn(func() time.Duration { return pollInterval }),
)
if err != nil {
return nil, err
return nil, nil, err
}
return syncer, repo, nil
}

func getRepoPath() (fullSuppressionPath, latestSuppressionPath string, err error) {
tmpDir, err := misc.CreateTMPDIR()
if err != nil {
return "", "", fmt.Errorf("could not create tmp dir: %w", err)
}
fullSuppressionPath = path.Join(tmpDir, "fullSuppression")
latestSuppressionPath = path.Join(tmpDir, "latestSuppression")
return
}

h := newHandler(repository, m.Log)
func latestDataSeed() (io.ReadCloser, error) {
return seederSource("latest-export")
}

rruntime.Go(func() {
syncer.SyncLoop(ctx)
_ = repository.Stop()
})
func fullDataSeed() (io.ReadCloser, error) {
return seederSource("full-export")
}

return h, nil
func seederSource(endpoint string) (io.ReadCloser, error) {
client := http.Client{}
baseURL := config.GetString("SUPPRESS_USER_BACKUP_SERVICE_URL", "https://api.rudderstack.com")
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", baseURL, endpoint), http.NoBody)
if err != nil {
return nil, fmt.Errorf("could not create request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("could not perform request: %w", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// close body afterwards.
return resp.Body, nil
}
123 changes: 123 additions & 0 deletions enterprise/suppress-user/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package suppression

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/gorilla/mux"
saurav-malani marked this conversation as resolved.
Show resolved Hide resolved
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/stretchr/testify/require"
)

func TestSuppressionSetup(t *testing.T) {
srv := httptest.NewServer(httpHandler(t))
defer t.Cleanup(srv.Close)
t.Setenv("WORKSPACE_TOKEN", "216Co97d9So9TkqphM0cxBzRxc3")
t.Setenv("CONFIG_BACKEND_URL", srv.URL)
t.Setenv("SUPPRESS_USER_BACKUP_SERVICE_URL", srv.URL)
t.Setenv("SUPPRESS_BACKUP_URL", srv.URL)
t.Setenv("SUPPRESS_USER_BACKEND_URL", srv.URL)
dir, err := os.MkdirTemp("/tmp", "rudder-server")
require.NoError(t, err)
t.Setenv("RUDDER_TMPDIR", dir)
defer os.RemoveAll(dir)
config.Set("Diagnostics.enableDiagnostics", false)
admin.Init()
misc.Init()
diagnostics.Init()
backendconfig.Init()

require.NoError(t, backendconfig.Setup(nil))
defer backendconfig.DefaultBackendConfig.Stop()
backendconfig.DefaultBackendConfig.StartWithIDs(context.TODO(), "")

t.Run(
"should setup badgerdb and syncer successfully after getting suppression from backup service",
func(t *testing.T) {
f := Factory{
EnterpriseToken: "token",
Log: logger.NOP,
}
ctx, cancel := context.WithCancel(context.Background())
h, err := f.Setup(ctx, backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
v := h.IsSuppressedUser("workspace-1", "user-1", "src-1")
require.True(t, v)
require.Eventually(t, func() bool {
return h.IsSuppressedUser("workspace-2", "user-2", "src-4")
}, time.Second*15, time.Millisecond*100, "User should be suppressed")
cancel()
time.Sleep(time.Second * 2)
h2, err := f.Setup(context.Background(), backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
require.True(t, h2.IsSuppressedUser("workspace-2", "user-2", "src-4"))
},
)
t.Run(
"should setup in-memory suppression db",
func(t *testing.T) {
f := Factory{
EnterpriseToken: "token",
Log: logger.NOP,
}
t.Setenv("RSERVER_BACKEND_CONFIG_REGULATIONS_USE_BADGER_DB", "false")
h, err := f.Setup(context.Background(), backendconfig.DefaultBackendConfig)
require.NoError(t, err, "Error in setting up suppression feature")
require.False(t, h.IsSuppressedUser("workspace-1", "user-1", "src-1"))
},
)
}

func httpHandler(t *testing.T) http.Handler {
t.Helper()
srvMux := mux.NewRouter()
srvMux.HandleFunc("/workspaceConfig", getSingleTenantWorkspaceConfig).Methods(http.MethodGet)
srvMux.HandleFunc("/full-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/full-export") }).Methods(http.MethodGet)
srvMux.HandleFunc("/latest-export", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "testdata/latest-export") }).Methods(http.MethodGet)
srvMux.HandleFunc("/dataplane/workspaces/{workspace_id}/regulations/suppressions", getSuppressionFromManager).Methods(http.MethodGet)
srvMux.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
next.ServeHTTP(w, req)
})
})

return srvMux
}

func getSingleTenantWorkspaceConfig(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
config := backendconfig.ConfigT{
WorkspaceID: "reg-test-workspaceId",
}
body, err := json.Marshal(config)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = w.Write(body)
}

func getSuppressionFromManager(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
s := suppressionsResponse{
Items: []model.Suppression{},
Token: "_token_",
}
body, err := json.Marshal(s)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = w.Write(body)
}
Loading