Skip to content

Commit

Permalink
chore: gateway backend config initialisation improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Jul 31, 2023
1 parent 1cdc6d1 commit 9d6baee
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
22 changes: 12 additions & 10 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -156,7 +158,7 @@ func filterProcessorEnabledDestinations(config ConfigT) ConfigT {
return modifiedConfig
}

func (bc *backendConfigImpl) configUpdate(ctx context.Context, workspaces string) {
func (bc *backendConfigImpl) configUpdate(ctx context.Context) {
statConfigBackendError := stats.Default.NewStat("config_backend.errors", stats.CountType)

var (
Expand Down Expand Up @@ -210,11 +212,11 @@ func (bc *backendConfigImpl) configUpdate(ctx context.Context, workspaces string

bc.curSourceJSONLock.Lock()
if !reflect.DeepEqual(bc.curSourceJSON, sourceJSON) {
if len(workspaces) > 0 {
pkgLogger.Infof("Workspace Config changed: %d", len(workspaces))
} else {
pkgLogger.Infof("Workspace Config changed")
}

pkgLogger.Infow("Workspace Config changed",
"workspaces", len(sourceJSON),
"sources", lo.Sum(lo.Map(lo.Values(sourceJSON), func(c ConfigT, _ int) int { return len(c.Sources) })),
)

if len(sourceJSON) == 1 { // only use diagnostics if there is one workspace
for _, wConfig := range sourceJSON {
Expand All @@ -236,9 +238,9 @@ func (bc *backendConfigImpl) configUpdate(ctx context.Context, workspaces string
bc.initializedLock.Unlock()
}

func (bc *backendConfigImpl) pollConfigUpdate(ctx context.Context, workspaces string) {
func (bc *backendConfigImpl) pollConfigUpdate(ctx context.Context) {
for {
bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -317,7 +319,7 @@ func Setup(configEnvHandler types.ConfigEnvI) (err error) {
return nil
}

func (bc *backendConfigImpl) StartWithIDs(ctx context.Context, workspaces string) {
func (bc *backendConfigImpl) StartWithIDs(ctx context.Context, _ string) {
var err error
ctx, cancel := context.WithCancel(ctx)
bc.ctx = ctx
Expand Down Expand Up @@ -352,7 +354,7 @@ func (bc *backendConfigImpl) StartWithIDs(ctx context.Context, workspaces string
}

rruntime.Go(func() {
bc.pollConfigUpdate(ctx, workspaces)
bc.pollConfigUpdate(ctx)
close(bc.blockChan)
})
}
Expand Down
13 changes: 5 additions & 8 deletions backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func TestConfigUpdate(t *testing.T) {
ctx = context.Background()
fakeError = errors.New("fake error")
cacheError = errors.New("cache error")
workspaces = "foo"
cacheStore = cache.NewMockCache(ctrl)
)
defer ctrl.Finish()
Expand All @@ -202,7 +201,7 @@ func TestConfigUpdate(t *testing.T) {
cache: cacheStore,
}
cacheStore.EXPECT().Get(ctx).Return([]byte{}, cacheError).Times(1)
bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)
require.False(t, bc.initialized)
})

Expand All @@ -223,7 +222,7 @@ func TestConfigUpdate(t *testing.T) {
curSourceJSON: map[string]ConfigT{workspaces: sampleBackendConfig}, // same as the one returned by the workspace config
cache: cacheStore,
}
bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)
require.True(t, bc.initialized)
})

Expand Down Expand Up @@ -251,7 +250,7 @@ func TestConfigUpdate(t *testing.T) {
chProcess := pubSub.Subscribe(ctx, string(TopicProcessConfig))
chBackend := pubSub.Subscribe(ctx, string(TopicBackendConfig))

bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)
require.True(t, bc.initialized)
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{workspaces: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, map[string]ConfigT{workspaces: sampleBackendConfig})
Expand Down Expand Up @@ -433,7 +432,6 @@ func TestCache(t *testing.T) {
var (
ctrl = gomock.NewController(t)
ctx, cancel = context.WithCancel(context.Background())
workspaces = "foo"
cacheStore = cache.NewMockCache(ctrl)
)
defer ctrl.Finish()
Expand All @@ -457,7 +455,7 @@ func TestCache(t *testing.T) {
chProcess := pubSub.Subscribe(ctx, string(TopicProcessConfig))
chBackend := pubSub.Subscribe(ctx, string(TopicBackendConfig))

bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)
require.True(t, bc.initialized)
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{sampleWorkspaceID: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, unmarshalledConfig)
Expand All @@ -467,7 +465,6 @@ func TestCache(t *testing.T) {
var (
ctrl = gomock.NewController(t)
ctx, cancel = context.WithCancel(context.Background())
workspaces = "foo"
cacheStore = cache.NewMockCache(ctrl)
)
defer ctrl.Finish()
Expand All @@ -484,7 +481,7 @@ func TestCache(t *testing.T) {
}
bc.curSourceJSON = map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig2}

bc.configUpdate(ctx, workspaces)
bc.configUpdate(ctx)
require.False(t, bc.initialized)
})

Expand Down
31 changes: 22 additions & 9 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/rudderlabs/rudder-server/gateway/webhook"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/middleware"
"github.com/rudderlabs/rudder-server/rruntime"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
Expand Down Expand Up @@ -201,6 +200,9 @@ type HandleT struct {
rsourcesService rsources.JobService
sourcehandle sourcedebugger.SourceDebugger
whProxy http.Handler

configInitialised bool
configInitialisedChan chan struct{}
}

// Part of the gateway module Setup call.
Expand Down Expand Up @@ -1307,7 +1309,7 @@ This function will block.
*/
func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
gateway.logger.Infof("WebHandler waiting for BackendConfig before starting on %d", webPort)
gateway.backendConfig.WaitForConfig(ctx)
<-gateway.configInitialisedChan
gateway.logger.Infof("WebHandler Starting on %d", webPort)
component := "gateway"
srvMux := chi.NewRouter()
Expand Down Expand Up @@ -1403,7 +1405,7 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
// StartAdminHandler for Admin Operations
func (gateway *HandleT) StartAdminHandler(ctx context.Context) error {
gateway.logger.Infof("AdminHandler waiting for BackendConfig before starting on %d", adminWebPort)
gateway.backendConfig.WaitForConfig(ctx)
<-gateway.configInitialisedChan
gateway.logger.Infof("AdminHandler starting on %d", adminWebPort)
component := "gateway"
srvMux := chi.NewRouter()
Expand All @@ -1421,8 +1423,16 @@ func (gateway *HandleT) StartAdminHandler(ctx context.Context) error {
}

// Gets the config from config backend and extracts enabled writekeys
func (gateway *HandleT) backendConfigSubscriber() {
ch := gateway.backendConfig.Subscribe(context.TODO(), backendconfig.TopicProcessConfig)
func (gateway *HandleT) backendConfigSubscriber(ctx context.Context) {
closeConfigChan := func(sources int) {
if !gateway.configInitialised {
gateway.logger.Infow("BackendConfig initialised", "sources", sources)
gateway.configInitialised = true
close(gateway.configInitialisedChan)
}
}
defer closeConfigChan(0)
ch := gateway.backendConfig.Subscribe(ctx, backendconfig.TopicProcessConfig)
for data := range ch {
var (
newWriteKeysSourceMap = map[string]backendconfig.SourceT{}
Expand Down Expand Up @@ -1450,6 +1460,7 @@ func (gateway *HandleT) backendConfigSubscriber() {
enabledWriteKeyWebhookMap = newEnabledWriteKeyWebhookMap
enabledWriteKeyWorkspaceMap = newEnabledWriteKeyWorkspaceMap
configSubscriberLock.Unlock()
closeConfigChan(len(writeKeysSourceMap))
}
}

Expand Down Expand Up @@ -1558,6 +1569,7 @@ func (gateway *HandleT) Setup(
tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: gateway.httpTimeout}
gateway.netHandle = client
gateway.configInitialisedChan = make(chan struct{})

// For the lack of better stat type, using TimerType.
gateway.batchSizeStat = gateway.stats.NewStat("gateway.batch_size", stats.HistogramType)
Expand Down Expand Up @@ -1603,17 +1615,18 @@ func (gateway *HandleT) Setup(
gateway.eventSchemaHandler = event_schema.GetInstance()
}

rruntime.Go(func() {
gateway.backendConfigSubscriber()
})

ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

gateway.backgroundCancel = cancel
gateway.backgroundWait = g.Wait
gateway.initUserWebRequestWorkers()

g.Go(misc.WithBugsnag(func() error {
gateway.backendConfigSubscriber(ctx)
return nil
}))

g.Go(misc.WithBugsnag(func() error {
gateway.runUserWebRequestWorkers(ctx)
return nil
Expand Down

0 comments on commit 9d6baee

Please sign in to comment.