Skip to content

Commit

Permalink
fix: backend config refactoring (#2200)
Browse files Browse the repository at this point in the history
* fix: backend config WaitForConfig() error

* chore: fixing log formatting

* fix: gateway setup error propagation

* chore: lifecycle Start() to support error

* chore: dynamic test to support Start() error

* chore: cleanup app.go

* chore: suppress user setup with error

* chore: update Mocklifecycle

* chore: making linter happy

* chore: updating BeforeEach for gw enterprise

* chore: handling some errors

* chore: avoid blocking in processor/manager

* fix: remove Fatal before defer

* chore: use http.NoBody

* chore: skipcq: CRT-P0006

* chore: use %q instead of "%s"

* chore: WaitForConfig improvements

* chore: use %q instead of "%s"

* chore: updating test

* chore: simplifying workspace change logic

* fix: mutexes logic order

* chore: cannot handle workspace change if empty

* chore: silencing DeepSource

* chore: signal.NotifyContext

* chore: backend config Setup() error

* chore: updating mock

* chore: remove error from WaitForConfig() + improvements

* chore: kubernetes health probes

* chore: updating tests

* chore: updating tests

* chore: unused method receiver RVV-B0013

* chore: use http.NoBody

* chore: use %q instead of "%s" for quoted strings

* chore: liveness/readiness logic

* chore: renaming health endpoints

* chore: removing unnecessary mock assertion from test

* chore: Update workspaceIDs with error

* chore: checking backend config authentication

* chore: main.go shutdown improvements

* chore: using return to run defers

* chore: proper use of exitCode

* chore: multi tenant test to cover EMBEDDED + new process

* chore: testMultiTenantByAppType

* chore: refactoring to ignore workspaceIDs

* chore: restoring /

* test: empty workspaces triggers a reload
  • Loading branch information
fracasula committed Aug 2, 2022
1 parent faa034d commit e242c7d
Show file tree
Hide file tree
Showing 40 changed files with 564 additions and 563 deletions.
28 changes: 19 additions & 9 deletions app/app.go
Expand Up @@ -115,15 +115,15 @@ func (a *App) Stop() {
if a.options.Cpuprofile != "" {
pkgLogger.Info("Stopping CPU profile")
pprof.StopCPUProfile()
a.cpuprofileOutput.Close()
_ = a.cpuprofileOutput.Close()
}

if a.options.Memprofile != "" {
f, err := os.Create(a.options.Memprofile)
if err != nil {
panic(err)
}
defer f.Close()
defer func() { _ = f.Close() }()
runtime.GC() // get up-to-date statistics
err = pprof.WriteHeapProfile(f)
if err != nil {
Expand All @@ -139,22 +139,32 @@ func New(options *Options) Interface {
}
}

// HealthHandler is the http handler for health endpoint
func HealthHandler(w http.ResponseWriter, r *http.Request, jobsDB jobsdb.JobsDB) {
var dbService string = "UP"
var enabledRouter string = "TRUE"
var backendConfigMode string = "API"
// LivenessHandler is the http handler for the Kubernetes liveness probe
func LivenessHandler(jobsDB jobsdb.JobsDB) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(getHealthVal(jobsDB)))
}
}

func getHealthVal(jobsDB jobsdb.JobsDB) string {
dbService := "UP"
if jobsDB.Ping() != nil {
dbService = "DOWN"
}
enabledRouter := "TRUE"
if !config.GetBool("enableRouter", true) {
enabledRouter = "FALSE"
}
backendConfigMode := "API"
if config.GetBool("BackendConfig.configFromFile", false) {
backendConfigMode = "JSON"
}

appTypeStr := strings.ToUpper(config.GetEnv("APP_TYPE", EMBEDDED))
healthVal := fmt.Sprintf(`{"appType": "%s", "server":"UP", "db":"%s","acceptingEvents":"TRUE","routingEvents":"%s","mode":"%s", "backendConfigMode": "%s", "lastSync":"%s", "lastRegulationSync":"%s"}`, appTypeStr, dbService, enabledRouter, strings.ToUpper(db.CurrentMode), backendConfigMode, backendconfig.LastSync, backendconfig.LastRegulationSync)
_, _ = w.Write([]byte(healthVal))
return fmt.Sprintf(
`{"appType":"%s","server":"UP","db":"%s","acceptingEvents":"TRUE","routingEvents":"%s","mode":"%s",`+
`"backendConfigMode":"%s","lastSync":"%s","lastRegulationSync":"%s"}`,
appTypeStr, dbService, enabledRouter, strings.ToUpper(db.CurrentMode),
backendConfigMode, backendconfig.LastSync, backendconfig.LastRegulationSync,
)
}
27 changes: 20 additions & 7 deletions app/apphandlers/embeddedAppHandler.go
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

// EmbeddedApp is the type for embedded type implemention
// EmbeddedApp is the type for embedded type implementation
type EmbeddedApp struct {
App app.Interface
VersionHandler func(w http.ResponseWriter, r *http.Request)
Expand All @@ -43,7 +43,7 @@ func (*EmbeddedApp) GetAppType() string {
}

func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Main starting")
pkgLogger.Info("Embedded mode: Starting Rudder Core")

rudderCoreDBValidator()
rudderCoreWorkSpaceTableSetup()
Expand Down Expand Up @@ -148,11 +148,10 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
clearDB := false
if enableProcessor {
g.Go(misc.WithBugsnag(func() error {
StartProcessor(
return StartProcessor(
ctx, &clearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB,
reportingI, multitenant.NOOP, transientSources, rsourcesService,
)
return nil
}))
}
}
Expand Down Expand Up @@ -244,11 +243,19 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithQueryFilterKeys(jobsdb.QueryFiltersT{}),
)
defer gwDBForProcessor.Close()
gatewayDB.Start()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
defer gatewayDB.Stop()

gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
gw.Setup(embedded.App, backendconfig.DefaultBackendConfig, gatewayDB, &rateLimiter, embedded.VersionHandler, rsourcesService)
err = gw.Setup(
embedded.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, embedded.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("could not setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
Expand All @@ -265,7 +272,13 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O

if enableReplay {
var replayDB jobsdb.HandleT
replayDB.Setup(jobsdb.ReadWrite, options.ClearDB, "replay", routerDBRetention, migrationMode, true, jobsdb.QueryFiltersT{}, prebackupHandlers)
err := replayDB.Setup(
jobsdb.ReadWrite, options.ClearDB, "replay", routerDBRetention,
migrationMode, true, jobsdb.QueryFiltersT{}, prebackupHandlers,
)
if err != nil {
return fmt.Errorf("could not setup replayDB: %w", err)
}
defer replayDB.TearDown()
embedded.App.Features().Replay.Setup(&replayDB, gatewayDB, routerDB, batchRouterDB)
}
Expand Down
12 changes: 10 additions & 2 deletions app/apphandlers/gatewayAppHandler.go
Expand Up @@ -59,7 +59,9 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
jobsdb.WithQueryFilterKeys(jobsdb.QueryFiltersT{}),
)
defer gatewayDB.Close()
gatewayDB.Start()
if err := gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gatewayDB: %w", err)
}
defer gatewayDB.Stop()

enableGateway := true
Expand Down Expand Up @@ -108,7 +110,13 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
if err != nil {
return err
}
gw.Setup(gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB, &rateLimiter, gatewayApp.VersionHandler, rsourcesService)
err = gw.Setup(
gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, gatewayApp.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
Expand Down
11 changes: 4 additions & 7 deletions app/apphandlers/processorAppHandler.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/bugsnag/bugsnag-go/v2"
"github.com/gorilla/mux"

"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/app/cluster/state"
Expand Down Expand Up @@ -168,7 +169,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
g.Go(func() error {
clearDB := false
if enableProcessor {
StartProcessor(
return StartProcessor(
ctx, &clearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB,
reportingI, multitenant.NOOP, transientSources, rsourcesService,
)
Expand Down Expand Up @@ -272,8 +273,8 @@ func startHealthWebHandler(ctx context.Context) error {
// Port where Processor health handler is running
pkgLogger.Infof("Starting in %d", webPort)
srvMux := mux.NewRouter()
srvMux.HandleFunc("/health", healthHandler)
srvMux.HandleFunc("/", healthHandler)
srvMux.HandleFunc("/health", app.LivenessHandler(gatewayDB))
srvMux.HandleFunc("/", app.LivenessHandler(gatewayDB))
srv := &http.Server{
Addr: ":" + strconv.Itoa(webPort),
Handler: bugsnag.Handler(srvMux),
Expand All @@ -294,7 +295,3 @@ func startHealthWebHandler(ctx context.Context) error {

return g.Wait()
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
app.HealthHandler(w, r, gatewayDB)
}
13 changes: 8 additions & 5 deletions app/apphandlers/setup.go
Expand Up @@ -117,16 +117,19 @@ func StartProcessor(
ctx context.Context, clearDB *bool, gatewayDB, routerDB, batchRouterDB,
procErrorDB *jobsdb.HandleT, reporting types.ReportingI, multitenantStat multitenant.MultiTenantI,
transientSources transientsource.Service, rsourcesService rsources.JobService,
) {
) error {
if !processorLoaded.First() {
pkgLogger.Debug("processor started by an other go routine")
return
pkgLogger.Debug("processor started by another go routine")
return nil
}

processorInstance := processor.NewProcessor()
processorInstance.Setup(backendconfig.DefaultBackendConfig, gatewayDB, routerDB, batchRouterDB, procErrorDB, clearDB, reporting, multitenantStat, transientSources, rsourcesService)
processorInstance.Setup(
backendconfig.DefaultBackendConfig, gatewayDB, routerDB, batchRouterDB, procErrorDB,
clearDB, reporting, multitenantStat, transientSources, rsourcesService,
)
defer processorInstance.Shutdown()
processorInstance.Start(ctx)
return processorInstance.Start(ctx)
}

// StartRouter atomically starts router process if not already started
Expand Down
12 changes: 6 additions & 6 deletions app/cluster/configlifecycle_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 38 additions & 27 deletions app/cluster/dynamic.go
Expand Up @@ -26,15 +26,15 @@ type ChangeEventProvider interface {
}

type lifecycle interface {
Start()
Start() error
Stop()
}

//go:generate mockgen -destination=./configlifecycle_mock_test.go -package=cluster_test -source=./dynamic.go configLifecycle
type configLifecycle interface {
Stop()
StartWithIDs(ctx context.Context, workspaces string)
WaitForConfig(ctx context.Context) error
WaitForConfig(ctx context.Context)
}

type Dynamic struct {
Expand Down Expand Up @@ -126,39 +126,49 @@ func (d *Dynamic) Run(ctx context.Context) error {

d.logger.Infof("Got trigger to change workspaceIDs: %q", ids)
err := d.handleWorkspaceChange(ctx, ids)
if ackErr := req.Ack(ctx, err); ackErr != nil {
return fmt.Errorf("ack workspaceIDs change with error: %v: %w", err, ackErr)
}
if err != nil {
d.logger.Debugf("Could not handle workspaceIDs change: %v", err)
if ackErr := req.Ack(ctx, err); ackErr != nil {
return fmt.Errorf("ack workspaceIDs change with error: %v: %w", err, ackErr)
}
return err
}

d.logger.Debugf("Acknowledging the workspaceIDs change")
if err := req.Ack(ctx, nil); err != nil {
return fmt.Errorf("ack workspaceIDs change: %w", err)
}
d.logger.Debug("WorkspaceIDs changed")
}
}
}

func (d *Dynamic) start() {
func (d *Dynamic) start() error {
if d.GatewayComponent {
return
return nil
}
d.logger.Info("Starting the server")
start := time.Now()
d.ErrorDB.Start()
d.GatewayDB.Start()
d.RouterDB.Start()
d.BatchRouterDB.Start()

d.MultiTenantStat.Start()

d.Processor.Start()
d.Router.Start()
if err := d.ErrorDB.Start(); err != nil {
return fmt.Errorf("error db start: %w", err)
}
if err := d.GatewayDB.Start(); err != nil {
return fmt.Errorf("gateway db start: %w", err)
}
if err := d.RouterDB.Start(); err != nil {
return fmt.Errorf("router db start: %w", err)
}
if err := d.BatchRouterDB.Start(); err != nil {
return fmt.Errorf("batch router db start: %w", err)
}
if err := d.MultiTenantStat.Start(); err != nil {
return fmt.Errorf("multi tenant stat start: %w", err)
}
if err := d.Processor.Start(); err != nil {
return fmt.Errorf("processor start: %w", err)
}
if err := d.Router.Start(); err != nil {
return fmt.Errorf("router start: %w", err)
}
d.serverStartTimeStat.SendTiming(time.Since(start))
d.serverStartCountStat.Increment()
return nil
}

func (d *Dynamic) stop() {
Expand All @@ -181,13 +191,11 @@ func (d *Dynamic) stop() {
}

func (d *Dynamic) handleWorkspaceChange(ctx context.Context, workspaces string) error {
if d.currentWorkspaceIDs == workspaces {
return nil
}
d.BackendConfig.Stop()
d.BackendConfig.StartWithIDs(ctx, workspaces)
d.currentWorkspaceIDs = workspaces
return d.BackendConfig.WaitForConfig(ctx)
d.BackendConfig.WaitForConfig(ctx)
return nil
}

func (d *Dynamic) handleModeChange(newMode servermode.Mode) error {
Expand All @@ -210,16 +218,19 @@ func (d *Dynamic) handleModeChange(newMode servermode.Mode) error {
d.logger.Info("Transiting the server from NormalMode to DegradedMode")
d.stop()
default:
d.logger.Errorf("Unsupported transition from NormalMode to %s \n", newMode)
d.logger.Errorf("Unsupported transition from NormalMode to %s", newMode)
return fmt.Errorf("unsupported transition from NormalMode to %s", newMode)
}
case servermode.DegradedMode:
switch newMode {
case servermode.NormalMode:
d.logger.Info("Transiting the server from DegradedMode to NormalMode")
d.start()
if err := d.start(); err != nil {
d.logger.Errorf("Failed to start the server: %v", err)
return fmt.Errorf("failed to start the server: %w", err)
}
default:
d.logger.Errorf("Unsupported transition from DegradedMode to %s \n", newMode)
d.logger.Errorf("Unsupported transition from DegradedMode to %s", newMode)
return fmt.Errorf("unsupported transition from DegradedMode to %s", newMode)
}
}
Expand Down

0 comments on commit e242c7d

Please sign in to comment.