Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 19, 2023
2 parents 3e6feac + 4bf9ce7 commit 6ef6694
Show file tree
Hide file tree
Showing 119 changed files with 3,724 additions and 1,249 deletions.
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ issues:
- errcheck
- unparam

- path: 'cmd/rudder-cli/status/status.go'
linters:
- bodyclose

linters-settings:
depguard:
rules:
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN BUILD_DATE=$(date "+%F,%T") \
make build

RUN go build -o devtool ./cmd/devtool/

RUN go build -o rudder-cli ./cmd/rudder-cli/

FROM alpine:${ALPINE_VERSION}

Expand All @@ -39,10 +39,10 @@ COPY --from=builder rudder-server/rudder-server .
COPY --from=builder rudder-server/build/wait-for-go/wait-for-go .
COPY --from=builder rudder-server/build/regulation-worker .
COPY --from=builder rudder-server/devtool .
COPY --from=builder rudder-server/rudder-cli /usr/bin/rudder-cli

COPY build/docker-entrypoint.sh /
COPY build/wait-for /
COPY ./rudder-cli/rudder-cli.linux.x86_64 /usr/bin/rudder-cli
COPY scripts/generate-event /scripts/generate-event
COPY scripts/batch.json /scripts/batch.json

Expand Down
35 changes: 15 additions & 20 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,19 @@ type embeddedApp struct {
log logger.Logger
config struct {
enableReplay bool
processorDSLimit int
routerDSLimit int
batchRouterDSLimit int
gatewayDSLimit int
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *embeddedApp) loadConfiguration() {
a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled")
config.RegisterIntConfigVariable(0, &a.config.processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.batchRouterDSLimit, true, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (a *embeddedApp) Setup(options *app.Options) error {
a.loadConfiguration()
a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled")
a.config.processorDSLimit = config.GetReloadableIntVar(0, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.routerDSLimit = config.GetReloadableIntVar(0, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.batchRouterDSLimit = config.GetReloadableIntVar(0, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")

if err := db.HandleEmbeddedRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.EMBEDDED); err != nil {
return err
Expand Down Expand Up @@ -153,7 +148,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
)
Expand All @@ -162,7 +157,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.routerDSLimit),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -171,7 +166,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.batchRouterDSLimit),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -182,7 +177,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -200,15 +195,15 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
Expand Down
11 changes: 3 additions & 8 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,12 @@ type gatewayApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
gatewayDSLimit int
gatewayDSLimit misc.ValueLoader[int]
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *gatewayApp) loadConfiguration() {
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (a *gatewayApp) Setup(options *app.Options) error {
a.loadConfiguration()
a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
if err := db.HandleNullRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.GATEWAY); err != nil {
return err
}
Expand Down Expand Up @@ -80,7 +75,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
Expand Down
35 changes: 15 additions & 20 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type processorApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
processorDSLimit int
routerDSLimit int
batchRouterDSLimit int
gatewayDSLimit int
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
http struct {
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
Expand All @@ -64,22 +64,17 @@ type processorApp struct {
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *processorApp) loadConfiguration() {
func (a *processorApp) Setup(options *app.Options) error {
a.config.http.ReadTimeout = config.GetDurationVar(0, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
a.config.http.ReadHeaderTimeout = config.GetDurationVar(0, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
a.config.http.WriteTimeout = config.GetDurationVar(10, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
a.config.http.IdleTimeout = config.GetDurationVar(720, time.Second, []string{"IdleTimeout", "IdleTimeoutInSec"}...)
a.config.http.webPort = config.GetIntVar(8086, 1, "Processor.webPort")
a.config.http.MaxHeaderBytes = config.GetIntVar(524288, 1, "MaxHeaderBytes")
config.RegisterIntConfigVariable(0, &a.config.processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.batchRouterDSLimit, true, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (a *processorApp) Setup(options *app.Options) error {
a.loadConfiguration()
a.config.processorDSLimit = config.GetReloadableIntVar(0, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.routerDSLimit = config.GetReloadableIntVar(0, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.batchRouterDSLimit = config.GetReloadableIntVar(0, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
if err := db.HandleNullRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.PROCESSOR); err != nil {
return err
}
Expand Down Expand Up @@ -148,7 +143,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
)
Expand All @@ -157,7 +152,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.routerDSLimit),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -166,7 +161,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.batchRouterDSLimit),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -175,7 +170,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
Expand All @@ -192,15 +187,15 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
Expand Down
2 changes: 0 additions & 2 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
mocksTransformer "github.com/rudderlabs/rudder-server/mocks/processor/transformer"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/processor/stash"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routermanager "github.com/rudderlabs/rudder-server/router/manager"
Expand Down Expand Up @@ -161,7 +160,6 @@ var (
func initJobsDB() {
config.Reset()
logger.Reset()
stash.Init()
admin.Init()
Init()
}
Expand Down
25 changes: 11 additions & 14 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/types"
Expand All @@ -31,14 +32,13 @@ import (

var (
// environment variables
configBackendURL string
cpRouterURL string
pollInterval, regulationsPollInterval time.Duration
configJSONPath string
configFromFile bool
maxRegulationsPerRequest int
configEnvReplacementEnabled bool
dbCacheEnabled bool
configBackendURL string
cpRouterURL string
pollInterval misc.ValueLoader[time.Duration]
configJSONPath string
configFromFile bool
configEnvReplacementEnabled bool
dbCacheEnabled bool

LastSync string
LastRegulationSync string
Expand Down Expand Up @@ -92,13 +92,10 @@ type backendConfigImpl struct {
cache cache.Cache
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func loadConfig() {
configBackendURL = config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com")
cpRouterURL = config.GetString("CP_ROUTER_URL", "https://cp-router.rudderlabs.com")
config.RegisterDurationConfigVariable(5, &pollInterval, true, time.Second, "BackendConfig.pollInterval", "BackendConfig.pollIntervalInS")
config.RegisterDurationConfigVariable(300, &regulationsPollInterval, true, time.Second, "BackendConfig.regulationsPollInterval", "BackendConfig.regulationsPollIntervalInS")
config.RegisterIntConfigVariable(1000, &maxRegulationsPerRequest, true, 1, "BackendConfig.maxRegulationsPerRequest")
pollInterval = config.GetReloadableDurationVar(5, time.Second, "BackendConfig.pollInterval", "BackendConfig.pollIntervalInS")
configJSONPath = config.GetStringVar("/etc/rudderstack/workspaceConfig.json", "BackendConfig.configJSONPath")
configFromFile = config.GetBoolVar(false, "BackendConfig.configFromFile")
configEnvReplacementEnabled = config.GetBoolVar(true, "BackendConfig.envReplacementEnabled")
Expand Down Expand Up @@ -246,7 +243,7 @@ func (bc *backendConfigImpl) pollConfigUpdate(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(pollInterval):
case <-time.After(pollInterval.Load()):
}
}
}
Expand Down Expand Up @@ -384,7 +381,7 @@ func (bc *backendConfigImpl) WaitForConfig(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(pollInterval):
case <-time.After(pollInterval.Load()):
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -324,7 +325,7 @@ func TestWaitForConfig(t *testing.T) {
defer ctrl.Finish()

pkgLogger = logger.NOP
pollInterval = time.Millisecond
pollInterval = misc.SingleValueLoader(time.Millisecond)
bc := &backendConfigImpl{initialized: false}

var done int32
Expand Down
Loading

0 comments on commit 6ef6694

Please sign in to comment.