Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 14, 2023
2 parents c88837f + 60749af commit 8aa056c
Show file tree
Hide file tree
Showing 47 changed files with 387 additions and 369 deletions.
3 changes: 2 additions & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type embeddedApp struct {
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *embeddedApp) loadConfiguration() {
config.RegisterBoolConfigVariable(types.DefaultReplayEnabled, &a.config.enableReplay, false, "Replay.enabled")
a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled")
config.RegisterIntConfigVariable(0, &a.config.processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type gatewayApp struct {
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *gatewayApp) loadConfiguration() {
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
}
Expand Down
13 changes: 7 additions & 6 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ type processorApp struct {
}
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (a *processorApp) loadConfiguration() {
config.RegisterDurationConfigVariable(0, &a.config.http.ReadTimeout, false, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
config.RegisterDurationConfigVariable(0, &a.config.http.ReadHeaderTimeout, false, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
config.RegisterDurationConfigVariable(10, &a.config.http.WriteTimeout, false, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
config.RegisterDurationConfigVariable(720, &a.config.http.IdleTimeout, false, time.Second, []string{"IdleTimeout", "IdleTimeoutInSec"}...)
config.RegisterIntConfigVariable(8086, &a.config.http.webPort, false, 1, "Processor.webPort")
config.RegisterIntConfigVariable(524288, &a.config.http.MaxHeaderBytes, false, 1, "MaxHeaderBytes")
a.config.http.ReadTimeout = config.GetDurationVar(0, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
a.config.http.ReadHeaderTimeout = config.GetDurationVar(0, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
a.config.http.WriteTimeout = config.GetDurationVar(10, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
a.config.http.IdleTimeout = config.GetDurationVar(720, time.Second, []string{"IdleTimeout", "IdleTimeoutInSec"}...)
a.config.http.webPort = config.GetIntVar(8086, 1, "Processor.webPort")
a.config.http.MaxHeaderBytes = config.GetIntVar(524288, 1, "MaxHeaderBytes")
config.RegisterIntConfigVariable(0, &a.config.processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
Expand Down
8 changes: 4 additions & 4 deletions app/cluster/state/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func EnvETCDConfig() *ETCDConfig {
var ackTimeout time.Duration

envConfigOnce.Do(func() {
config.RegisterDurationConfigVariable(15, &ackTimeout, false, time.Second, "etcd.ackTimeout")
config.RegisterDurationConfigVariable(30, &keepaliveTime, false, time.Second, "etcd.keepaliveTime")
config.RegisterDurationConfigVariable(10, &keepaliveTimeout, false, time.Second, "etcd.keepaliveTimeout")
config.RegisterDurationConfigVariable(20, &dialTimeout, false, time.Second, "etcd.dialTimeout")
ackTimeout = config.GetDurationVar(15, time.Second, "etcd.ackTimeout")
keepaliveTime = config.GetDurationVar(30, time.Second, "etcd.keepaliveTime")
keepaliveTimeout = config.GetDurationVar(10, time.Second, "etcd.keepaliveTimeout")
dialTimeout = config.GetDurationVar(20, time.Second, "etcd.dialTimeout")
})

return &ETCDConfig{
Expand Down
15 changes: 8 additions & 7 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,18 @@ type backendConfigImpl struct {
cache cache.Cache
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func loadConfig() {
configBackendURL = config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com")
cpRouterURL = config.GetString("CP_ROUTER_URL", "https://cp-router.rudderlabs.com")
config.RegisterDurationConfigVariable(5, &pollInterval, true, time.Second, []string{"BackendConfig.pollInterval", "BackendConfig.pollIntervalInS"}...)
config.RegisterDurationConfigVariable(300, &regulationsPollInterval, true, time.Second, []string{"BackendConfig.regulationsPollInterval", "BackendConfig.regulationsPollIntervalInS"}...)
config.RegisterStringConfigVariable("/etc/rudderstack/workspaceConfig.json", &configJSONPath, false, "BackendConfig.configJSONPath")
config.RegisterBoolConfigVariable(false, &configFromFile, false, "BackendConfig.configFromFile")
config.RegisterDurationConfigVariable(5, &pollInterval, true, time.Second, "BackendConfig.pollInterval", "BackendConfig.pollIntervalInS")
config.RegisterDurationConfigVariable(300, &regulationsPollInterval, true, time.Second, "BackendConfig.regulationsPollInterval", "BackendConfig.regulationsPollIntervalInS")
config.RegisterIntConfigVariable(1000, &maxRegulationsPerRequest, true, 1, "BackendConfig.maxRegulationsPerRequest")
config.RegisterBoolConfigVariable(true, &configEnvReplacementEnabled, false, "BackendConfig.envReplacementEnabled")
config.RegisterBoolConfigVariable(false, &incrementalConfigUpdates, false, "BackendConfig.incrementalConfigUpdates")
config.RegisterBoolConfigVariable(true, &dbCacheEnabled, false, "BackendConfig.dbCacheEnabled")
configJSONPath = config.GetStringVar("/etc/rudderstack/workspaceConfig.json", "BackendConfig.configJSONPath")
configFromFile = config.GetBoolVar(false, "BackendConfig.configFromFile")
configEnvReplacementEnabled = config.GetBoolVar(true, "BackendConfig.envReplacementEnabled")
incrementalConfigUpdates = config.GetBoolVar(false, "BackendConfig.incrementalConfigUpdates")
dbCacheEnabled = config.GetBoolVar(true, "BackendConfig.dbCacheEnabled")
}

func Init() {
Expand Down
56 changes: 30 additions & 26 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,41 +64,31 @@ func (gw *Handle) Setup(
gw.rsourcesService = rsourcesService
gw.sourcehandle = sourcehandle

config.RegisterDurationConfigVariable(30, &gw.conf.httpTimeout, false, time.Second, "Gateway.httpTimeout")
gw.conf.httpTimeout = config.GetDurationVar(30, time.Second, "Gateway.httpTimeout")
// Port where GW is running
config.RegisterIntConfigVariable(8080, &gw.conf.webPort, false, 1, "Gateway.webPort")
gw.conf.webPort = config.GetIntVar(8080, 1, "Gateway.webPort")
// Number of incoming requests that are batched before handing off to write workers
config.RegisterIntConfigVariable(128, &gw.conf.maxUserWebRequestBatchSize, false, 1, "Gateway.maxUserRequestBatchSize")
gw.conf.maxUserWebRequestBatchSize = config.GetIntVar(128, 1, "Gateway.maxUserRequestBatchSize")
// Number of userWorkerBatchRequest that are batched before initiating write
config.RegisterIntConfigVariable(128, &gw.conf.maxDBBatchSize, false, 1, "Gateway.maxDBBatchSize")
// Timeout after which batch is formed anyway with whatever requests
// are available
config.RegisterDurationConfigVariable(15, &gw.conf.userWebRequestBatchTimeout, true, time.Millisecond, []string{"Gateway.userWebRequestBatchTimeout", "Gateway.userWebRequestBatchTimeoutInMS"}...)
config.RegisterDurationConfigVariable(5, &gw.conf.dbBatchWriteTimeout, true, time.Millisecond, []string{"Gateway.dbBatchWriteTimeout", "Gateway.dbBatchWriteTimeoutInMS"}...)
gw.conf.maxDBBatchSize = config.GetIntVar(128, 1, "Gateway.maxDBBatchSize")
// Multiple workers are used to batch user web requests
config.RegisterIntConfigVariable(64, &gw.conf.maxUserWebRequestWorkerProcess, false, 1, "Gateway.maxUserWebRequestWorkerProcess")
gw.conf.maxUserWebRequestWorkerProcess = config.GetIntVar(64, 1, "Gateway.maxUserWebRequestWorkerProcess")
// Multiple DB writers are used to write data to DB
config.RegisterIntConfigVariable(256, &gw.conf.maxDBWriterProcess, false, 1, "Gateway.maxDBWriterProcess")
// Maximum request size to gateway
config.RegisterIntConfigVariable(4000, &gw.conf.maxReqSize, true, 1024, "Gateway.maxReqSizeInKB")
// Enable rate limit on incoming events. false by default
config.RegisterBoolConfigVariable(false, &gw.conf.enableRateLimit, true, "Gateway.enableRateLimit")
gw.conf.maxDBWriterProcess = config.GetIntVar(256, 1, "Gateway.maxDBWriterProcess")
gw.setupReloadableVars(config)
// Enable suppress user feature. false by default
config.RegisterBoolConfigVariable(true, &gw.conf.enableSuppressUserFeature, false, "Gateway.enableSuppressUserFeature")
gw.conf.enableSuppressUserFeature = config.GetBoolVar(true, "Gateway.enableSuppressUserFeature")
// EventSchemas feature. false by default
config.RegisterBoolConfigVariable(false, &gw.conf.enableEventSchemasFeature, false, "EventSchemas.enableEventSchemasFeature")
gw.conf.enableEventSchemasFeature = config.GetBoolVar(false, "EventSchemas.enableEventSchemasFeature")
// Time period for diagnosis ticker
config.RegisterDurationConfigVariable(60, &gw.conf.diagnosisTickerTime, false, time.Second, []string{"Diagnostics.gatewayTimePeriod", "Diagnostics.gatewayTimePeriodInS"}...)
// Enables accepting requests without user id and anonymous id. This is added to prevent client 4xx retries.
config.RegisterBoolConfigVariable(false, &gw.conf.allowReqsWithoutUserIDAndAnonymousID, true, "Gateway.allowReqsWithoutUserIDAndAnonymousID")
config.RegisterBoolConfigVariable(true, &gw.conf.gwAllowPartialWriteWithErrors, true, "Gateway.allowPartialWriteWithErrors")
config.RegisterDurationConfigVariable(0, &gw.conf.ReadTimeout, false, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
config.RegisterDurationConfigVariable(0, &gw.conf.ReadHeaderTimeout, false, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
config.RegisterDurationConfigVariable(10, &gw.conf.WriteTimeout, false, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
config.RegisterDurationConfigVariable(720, &gw.conf.IdleTimeout, false, time.Second, []string{"IdleTimeout", "IdleTimeoutInSec"}...)
config.RegisterIntConfigVariable(524288, &gw.conf.maxHeaderBytes, false, 1, "MaxHeaderBytes")
gw.conf.diagnosisTickerTime = config.GetDurationVar(60, time.Second, "Diagnostics.gatewayTimePeriod", "Diagnostics.gatewayTimePeriodInS")
gw.conf.ReadTimeout = config.GetDurationVar(0, time.Second, "ReadTimeout", "ReadTimeOutInSec")
gw.conf.ReadHeaderTimeout = config.GetDurationVar(0, time.Second, "ReadHeaderTimeout", "ReadHeaderTimeoutInSec")
gw.conf.WriteTimeout = config.GetDurationVar(10, time.Second, "WriteTimeout", "WriteTimeOutInSec")
gw.conf.IdleTimeout = config.GetDurationVar(720, time.Second, "IdleTimeout", "IdleTimeoutInSec")
gw.conf.maxHeaderBytes = config.GetIntVar(524288, 1, "MaxHeaderBytes")
// if set to '0', it means disabled.
config.RegisterIntConfigVariable(50000, &gw.conf.maxConcurrentRequests, false, 1, "Gateway.maxConcurrentRequests")
gw.conf.maxConcurrentRequests = config.GetIntVar(50000, 1, "Gateway.maxConcurrentRequests")

// Registering stats
gw.batchSizeStat = gw.stats.NewStat("gateway.batch_size", stats.HistogramType)
Expand Down Expand Up @@ -165,6 +155,20 @@ func (gw *Handle) Setup(
return nil
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (gw *Handle) setupReloadableVars(config *config.Config) {
// Timeout after which batch is formed anyway with whatever requests are available
config.RegisterDurationConfigVariable(15, &gw.conf.userWebRequestBatchTimeout, true, time.Millisecond, "Gateway.userWebRequestBatchTimeout", "Gateway.userWebRequestBatchTimeoutInMS")
config.RegisterDurationConfigVariable(5, &gw.conf.dbBatchWriteTimeout, true, time.Millisecond, "Gateway.dbBatchWriteTimeout", "Gateway.dbBatchWriteTimeoutInMS")
// Enables accepting requests without user id and anonymous id. This is added to prevent client 4xx retries.
config.RegisterBoolConfigVariable(false, &gw.conf.allowReqsWithoutUserIDAndAnonymousID, true, "Gateway.allowReqsWithoutUserIDAndAnonymousID")
config.RegisterBoolConfigVariable(true, &gw.conf.gwAllowPartialWriteWithErrors, true, "Gateway.allowPartialWriteWithErrors")
// Maximum request size to gateway
config.RegisterIntConfigVariable(4000, &gw.conf.maxReqSize, true, 1024, "Gateway.maxReqSizeInKB")
// Enable rate limit on incoming events. false by default
config.RegisterBoolConfigVariable(false, &gw.conf.enableRateLimit, true, "Gateway.enableRateLimit")
}

// initUserWebRequestWorkers initiates `maxUserWebRequestWorkerProcess` number of `webRequestWorkers` that listen on their `webRequestQ` for new WebRequests.
func (gw *Handle) initUserWebRequestWorkers() {
gw.userWebRequestWorkers = make([]*userWebRequestWorkerT, gw.conf.maxUserWebRequestWorkerProcess)
Expand Down
1 change: 1 addition & 0 deletions gateway/webhook/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
)

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func loadConfig() {
sourceTransformerURL = strings.TrimSuffix(config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), "/") + "/v0/sources"
// Number of incoming webhooks that are batched before calling source transformer
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
github.com/rs/cors v1.9.0
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.15.8
github.com/rudderlabs/rudder-go-kit v0.15.9
github.com/rudderlabs/sql-tunnels v0.1.4
github.com/samber/lo v1.38.1
github.com/segmentio/kafka-go v0.4.42
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1774,8 +1774,8 @@ github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xb
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.15.8 h1:EpTHGpvqTjx9XotO67g3lTIaWzzVYBMGWvuimxtZUY0=
github.com/rudderlabs/rudder-go-kit v0.15.8/go.mod h1:W7b3rft7Kt8TvNd4gguw5ZB9Hu4M1k60EGLv91+N2zM=
github.com/rudderlabs/rudder-go-kit v0.15.9 h1:i1dUUL1JLAOOn+2RgVq0K6UnTn6X4OZMTiZPZZBKmTQ=
github.com/rudderlabs/rudder-go-kit v0.15.9/go.mod h1:W7b3rft7Kt8TvNd4gguw5ZB9Hu4M1k60EGLv91+N2zM=
github.com/rudderlabs/sql-tunnels v0.1.4 h1:snnkUItx3nRCPGSouibkYzBpOcEiWhCVjRY95A0eMuk=
github.com/rudderlabs/sql-tunnels v0.1.4/go.mod h1:C6O0M6C3V+pdVwjy3UvyPd+d5SeRvbBQfApm+67qNnI=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
Expand Down
1 change: 1 addition & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ func (jd *Handle) workersAndAuxSetup() {
jd.statDropDSPeriod = stats.Default.NewTaggedStat("jobsdb.drop_ds_period", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (jd *Handle) loadConfig() {
// maxTableSizeInMB: Maximum Table size in MB
jd.config.RegisterInt64ConfigVariable(300, &jd.conf.maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB")
Expand Down
14 changes: 7 additions & 7 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ func (jd *Handle) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) (
var rows *sql.Rows
rows, err := jd.dbHandle.QueryContext(
ctx,
`SELECT reltuples AS estimate, relname
FROM pg_class
`SELECT reltuples AS estimate, relname
FROM pg_class
where relname = ANY(
SELECT tablename
FROM pg_catalog.pg_tables
SELECT tablename
FROM pg_catalog.pg_tables
WHERE schemaname NOT IN ('pg_catalog','information_schema')
AND tablename like $1
)`,
Expand Down Expand Up @@ -432,13 +432,13 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat
`with last_status as (select * from "v_last_%[1]s"),
inserted_jobs as
(
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id
),
insertedStatuses as
insertedStatuses as
(
insert into %[4]q (job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters)
insert into %[4]q (job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters)
(select job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters from last_status where job_state = ANY('{%[5]s}'))
)
select count(*) from inserted_jobs;`,
Expand Down
Loading

0 comments on commit 8aa056c

Please sign in to comment.