Skip to content

Commit

Permalink
chore: limit queries to ds (#2446)
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC committed Sep 21, 2022
1 parent 8199002 commit 082b9f5
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 31 deletions.
4 changes: 4 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Expand Up @@ -91,6 +91,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -99,6 +100,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -107,6 +109,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -115,6 +118,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
)

var tenantRouterDB jobsdb.MultiTenantJobsDB
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/gatewayAppHandler.go
Expand Up @@ -55,6 +55,7 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithDSLimit(&gatewayDSLimit),
)
defer gatewayDB.Close()
if err := gatewayDB.Start(); err != nil {
Expand Down
26 changes: 19 additions & 7 deletions app/apphandlers/processorAppHandler.go
Expand Up @@ -43,13 +43,17 @@ type ProcessorApp struct {
}

var (
gatewayDB *jobsdb.HandleT
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
webPort int
MaxHeaderBytes int
gatewayDB *jobsdb.HandleT
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
webPort int
MaxHeaderBytes int
processorDSLimit int
routerDSLimit int
batchRouterDSLimit int
gatewayDSLimit int
)

func (*ProcessorApp) GetAppType() string {
Expand All @@ -67,6 +71,10 @@ func loadConfigHandler() {
config.RegisterDurationConfigVariable(720, &IdleTimeout, false, time.Second, []string{"IdleTimeout", "IdleTimeoutInSec"}...)
config.RegisterIntConfigVariable(8086, &webPort, false, 1, "Processor.webPort")
config.RegisterIntConfigVariable(524288, &MaxHeaderBytes, false, 1, "MaxHeaderBytes")
config.RegisterIntConfigVariable(0, &processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &batchRouterDSLimit, true, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app.Options) error {
Expand Down Expand Up @@ -114,6 +122,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
)
defer gwDBForProcessor.Close()
gatewayDB = gwDBForProcessor
Expand All @@ -123,6 +132,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -131,6 +141,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -139,6 +150,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithMigrationMode(migrationMode),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
)
var tenantRouterDB jobsdb.MultiTenantJobsDB
var multitenantStats multitenant.MultiTenantI
Expand Down
81 changes: 58 additions & 23 deletions jobsdb/jobsdb.go
Expand Up @@ -423,7 +423,6 @@ type HandleT struct {
isStatNewDSPeriodInitialized bool
statDropDSPeriod stats.RudderStats
unionQueryTime stats.RudderStats
tablesQueriedStat stats.RudderStats
isStatDropDSPeriodInitialized bool
migrationState migrationState
inProgressMigrationTargetDS *dataSetT
Expand All @@ -434,6 +433,7 @@ type HandleT struct {
enableWriterQueue bool
enableReaderQueue bool
clearAll bool
dsLimit *int
maxReaders int
maxWriters int
maxOpenConnections int
Expand Down Expand Up @@ -735,6 +735,12 @@ func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc {
}
}

func WithDSLimit(limit *int) OptsFunc {
return func(jd *HandleT) {
jd.dsLimit = limit
}
}

func NewForRead(tablePrefix string, opts ...OptsFunc) *HandleT {
return newOwnerType(Read, tablePrefix, opts...)
}
Expand Down Expand Up @@ -865,10 +871,6 @@ func (jd *HandleT) workersAndAuxSetup() {

jd.statTableCount = stats.DefaultStats.NewStat(fmt.Sprintf("jobsdb.%s_tables_count", jd.tablePrefix), stats.GaugeType)
jd.statDSCount = stats.NewTaggedStat("jobsdb.tables_count", stats.GaugeType, stats.Tags{"customVal": jd.tablePrefix})
jd.tablesQueriedStat = stats.NewTaggedStat("tables_queried_gauge", stats.GaugeType, stats.Tags{
"state": "nonterminal",
"customVal": jd.tablePrefix,
})
jd.unionQueryTime = stats.NewTaggedStat("union_query_time", stats.TimerType, stats.Tags{
"state": "nonterminal",
"customVal": jd.tablePrefix,
Expand Down Expand Up @@ -1944,12 +1946,12 @@ func (jd *HandleT) migrateJobs(ctx context.Context, srcDS, destDS dataSetT) (noJ
defer jd.dsListLock.RUnlock()

// Unprocessed jobs
unprocessedList, err := jd.getUnprocessedJobsDS(ctx, srcDS, false, GetQueryParamsT{})
unprocessedList, _, err := jd.getUnprocessedJobsDS(ctx, srcDS, false, GetQueryParamsT{})
if err != nil {
return 0, err
}
// Jobs which haven't finished processing
retryList, err := jd.getProcessedJobsDS(ctx, srcDS, true,
retryList, _, err := jd.getProcessedJobsDS(ctx, srcDS, true,
GetQueryParamsT{StateFilters: validNonTerminalStates})
if err != nil {
return 0, err
Expand Down Expand Up @@ -2623,7 +2625,7 @@ stateFilters and customValFilters do a OR query on values passed in array
parameterFilters do a AND query on values included in the map.
A JobsLimit less than or equal to zero indicates no limit.
*/
func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll bool, params GetQueryParamsT) (JobsResult, error) { // skipcq: CRT-P0003
func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll bool, params GetQueryParamsT) (JobsResult, bool, error) { // skipcq: CRT-P0003
stateFilters := params.StateFilters
customValFilters := params.CustomValFilters
parameterFilters := params.ParameterFilters
Expand All @@ -2632,7 +2634,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b

if jd.isEmptyResult(ds, allWorkspaces, stateFilters, customValFilters, parameterFilters) {
jd.logger.Debugf("[getProcessedJobsDS] Empty cache hit for ds: %v, stateFilters: %v, customValFilters: %v, parameterFilters: %v", ds, stateFilters, customValFilters, parameterFilters)
return JobsResult{}, nil
return JobsResult{}, false, nil
}

tags := statTags{CustomValFilters: params.CustomValFilters, StateFilters: params.StateFilters, ParameterFilters: params.ParameterFilters}
Expand Down Expand Up @@ -2694,7 +2696,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
var err error
rows, err = jd.dbHandle.QueryContext(ctx, sqlStatement)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()

Expand Down Expand Up @@ -2743,12 +2745,12 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b

stmt, err := jd.dbHandle.PrepareContext(ctx, sqlStatement)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
defer func() { _ = stmt.Close() }()
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()
}
Expand All @@ -2770,7 +2772,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
&job.LastJobStatus.ExecTime, &job.LastJobStatus.RetryTime,
&job.LastJobStatus.ErrorCode, &job.LastJobStatus.ErrorResponse, &job.LastJobStatus.Parameters)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}

if !getAll { // if getAll is true, limits do not apply
Expand Down Expand Up @@ -2811,7 +2813,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
LimitsReached: limitsReached,
PayloadSize: payloadSize,
EventsCount: eventCount,
}, nil
}, true, nil
}

/*
Expand All @@ -2820,13 +2822,13 @@ stateFilters and customValFilters do a OR query on values passed in array
parameterFilters do a AND query on values included in the map.
A JobsLimit less than or equal to zero indicates no limit.
*/
func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, order bool, params GetQueryParamsT) (JobsResult, error) { // skipcq: CRT-P0003
func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, order bool, params GetQueryParamsT) (JobsResult, bool, error) { // skipcq: CRT-P0003
customValFilters := params.CustomValFilters
parameterFilters := params.ParameterFilters

if jd.isEmptyResult(ds, allWorkspaces, []string{NotProcessed.State}, customValFilters, parameterFilters) {
jd.logger.Debugf("[getUnprocessedJobsDS] Empty cache hit for ds: %v, stateFilters: NP, customValFilters: %v, parameterFilters: %v", ds, customValFilters, parameterFilters)
return JobsResult{}, nil
return JobsResult{}, false, nil
}

tags := statTags{CustomValFilters: params.CustomValFilters, ParameterFilters: params.ParameterFilters}
Expand Down Expand Up @@ -2903,11 +2905,11 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, order

rows, err = jd.dbHandle.QueryContext(ctx, sqlStatement, args...)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
var runningEventCount int
var runningPayloadSize int64
Expand All @@ -2922,7 +2924,7 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, order
err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal,
&job.EventPayload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize)
if err != nil {
return JobsResult{}, err
return JobsResult{}, false, err
}
if params.EventsLimit > 0 && runningEventCount > params.EventsLimit && len(jobList) > 0 {
// events limit overflow is triggered as long as we have read at least one job
Expand Down Expand Up @@ -2963,7 +2965,7 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, order
LimitsReached: limitsReached,
PayloadSize: payloadSize,
EventsCount: eventCount,
}, nil
}, true, nil
}

// copyJobStatusDS is expected to be called only during a migration
Expand Down Expand Up @@ -4328,11 +4330,22 @@ func (jd *HandleT) getUnprocessed(ctx context.Context, params GetQueryParamsT) (
}

var completeUnprocessedJobs JobsResult
dsQueryCount := 0
var dsLimit int
if jd.dsLimit != nil {
dsLimit = *jd.dsLimit
}
for _, ds := range dsList {
unprocessedJobs, err := jd.getUnprocessedJobsDS(ctx, ds, true, params)
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
unprocessedJobs, dsHit, err := jd.getUnprocessedJobsDS(ctx, ds, true, params)
if err != nil {
return JobsResult{}, err
}
if dsHit {
dsQueryCount++
}
completeUnprocessedJobs.Jobs = append(completeUnprocessedJobs.Jobs, unprocessedJobs.Jobs...)
completeUnprocessedJobs.EventsCount += unprocessedJobs.EventsCount
completeUnprocessedJobs.PayloadSize += unprocessedJobs.PayloadSize
Expand All @@ -4352,6 +4365,12 @@ func (jd *HandleT) getUnprocessed(ctx context.Context, params GetQueryParamsT) (
params.PayloadSizeLimit -= unprocessedJobs.PayloadSize
}
}
unprocessedQueryTablesQueriedStat := stats.NewTaggedStat("tables_queried_gauge", stats.GaugeType, stats.Tags{
"state": "nonterminal",
"query": "unprocessed",
"customVal": jd.tablePrefix,
})
unprocessedQueryTablesQueriedStat.Gauge(dsQueryCount)
// Release lock
return completeUnprocessedJobs, nil
}
Expand Down Expand Up @@ -4536,11 +4555,22 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
}

var completeProcessedJobs JobsResult
dsQueryCount := 0
var dsLimit int
if jd.dsLimit != nil {
dsLimit = *jd.dsLimit
}
for _, ds := range dsList {
processedJobs, err := jd.getProcessedJobsDS(ctx, ds, false, params)
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
processedJobs, dsHit, err := jd.getProcessedJobsDS(ctx, ds, false, params)
if err != nil {
return JobsResult{}, err
}
if dsHit {
dsQueryCount++
}
completeProcessedJobs.Jobs = append(completeProcessedJobs.Jobs, processedJobs.Jobs...)
completeProcessedJobs.EventsCount += processedJobs.EventsCount
completeProcessedJobs.PayloadSize += processedJobs.PayloadSize
Expand All @@ -4560,7 +4590,12 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
params.PayloadSizeLimit -= processedJobs.PayloadSize
}
}

processedQueryTablesQueriedStat := stats.NewTaggedStat("tables_queried_gauge", stats.GaugeType, stats.Tags{
"state": "nonterminal",
"query": "processed",
"customVal": jd.tablePrefix,
})
processedQueryTablesQueriedStat.Gauge(dsQueryCount)
return completeProcessedJobs, nil
}

Expand Down

0 comments on commit 082b9f5

Please sign in to comment.