Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani committed Mar 30, 2023
1 parent 3d80d64 commit e4b0ffb
Showing 1 changed file with 67 additions and 25 deletions.
92 changes: 67 additions & 25 deletions enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package suppression

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"path"
"sync/atomic"
"sync"
"time"
"unsafe"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -45,16 +44,12 @@ func seederSource(endpoint string) (io.Reader, error) {
if err != nil {
return nil, fmt.Errorf("could not perform request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not read response body: %w", err)
}
return bytes.NewReader(respBody), nil
// close body afterwards.
return resp.Body, nil
}

// Setup initializes the user suppression feature
Expand All @@ -78,12 +73,10 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
latestRepo, err = NewBadgerRepository(
path,
m.Log,
WithSeederSource(latestDataSeed),
WithMaxSeedWait(config.GetDuration("BackendConfig.Regulations.maxSeedWait", 600, time.Second)))
WithSeederSource(latestDataSeed))
if err != nil {
return nil, fmt.Errorf("could not create badger repository: %w", err)
}

} else {
latestRepo = NewMemoryRepository(m.Log)
}
Expand All @@ -104,14 +97,17 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
return nil, err
}

repo := &latestRepo
subCtx, LatestSyncCancel := context.WithCancel(ctx)
// repo := &latestRepo
subCtx, latestSyncCancel := context.WithCancel(ctx)
rruntime.Go(func() {
syncer.SyncLoop(subCtx)
err = latestRepo.Stop()
defer LatestSyncCancel()
defer latestSyncCancel()
})

repo := &RepoHolder{
Repository: latestRepo,
mu: sync.RWMutex{},
}
if config.GetBool("BackendConfig.Regulations.useBadgerDB", true) {
rruntime.Go(func() {
tmpDir, err := misc.CreateTMPDIR()
Expand All @@ -123,8 +119,7 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
fullRepo, err := NewBadgerRepository(
path,
m.Log,
WithSeederSource(fullDataSeed),
WithMaxSeedWait(config.GetDuration("BackendConfig.Regulations.maxSeedWait", 600, time.Second)))
WithSeederSource(fullDataSeed))
if err != nil {
m.Log.Error("Complete Synce failed: could not create badger repository: %w", err)
return
Expand All @@ -142,18 +137,65 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
m.Log.Error("Complete Synce failed: could not create syncer: %w", err)
return
}

rruntime.Go(func() {
atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(repo)), unsafe.Pointer(&fullRepo))
LatestSyncCancel()
})

if err = syncer.Sync(ctx); err != nil {
m.Log.Error("Complete Synce failed: could not sync: %w", err)
}
latestSyncCancel()
repo.Switcher(fullRepo)
syncer.SyncLoop(ctx)
_ = fullRepo.Stop()
})
}

h := newHandler(*repo, m.Log)
h := newHandler(repo, m.Log)

return h, nil
}

type RepoHolder struct {
Repository
mu sync.RWMutex
}

func (rh *RepoHolder) Stop() error {
rh.mu.Lock()
defer rh.mu.Unlock()
return rh.Repository.Stop()
}

func (rh *RepoHolder) GetToken() ([]byte, error) {
rh.mu.RLock()
defer rh.mu.RUnlock()
return rh.Repository.GetToken()
}

func (rh *RepoHolder) Add(suppressions []model.Suppression, token []byte) error {
rh.mu.Lock()
defer rh.mu.Unlock()
return rh.Repository.Add(suppressions, token)
}

func (rh *RepoHolder) Suppressed(workspaceID, userID, sourceID string) (bool, error) {
rh.mu.RLock()
defer rh.mu.RUnlock()
return rh.Repository.Suppressed(workspaceID, userID, sourceID)
}

func (rh *RepoHolder) Backup(w io.Writer) error {
rh.mu.RLock()
defer rh.mu.RUnlock()
return rh.Repository.Backup(w)
}

func (rh *RepoHolder) Restore(r io.Reader) error {
rh.mu.Lock()
defer rh.mu.Unlock()
return rh.Repository.Restore(r)
}

func (rh *RepoHolder) Switcher(newRepo Repository) error {
rh.mu.Lock()
defer rh.mu.Unlock()
rh.Repository = newRepo
return nil
}

0 comments on commit e4b0ffb

Please sign in to comment.