Skip to content

Commit

Permalink
fix: gw transient errors crash (#3397)
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani committed Jun 8, 2023
1 parent fbe109f commit 6ad51e8
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 220 deletions.
5 changes: 5 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -148,6 +149,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -156,6 +158,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -164,11 +167,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)

var tenantRouterDB jobsdb.MultiTenantJobsDB
Expand Down
2 changes: 2 additions & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gatewayDB.Close()

if err := gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gatewayDB: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -156,6 +157,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -164,6 +166,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -172,11 +175,13 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)

var tenantRouterDB jobsdb.MultiTenantJobsDB
Expand Down
102 changes: 68 additions & 34 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,65 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
case <-ctx.Done():
return
}
jd.logger.Debugf("backupDSLoop backup enabled %s", jd.tablePrefix)
backupDSRange := jd.getBackupDSRange()
// check if non-empty dataset is present to back up
// else continue
sleepMultiplier = 1
if (dataSetRangeT{} == *backupDSRange) {
// sleep for more duration if no dataset is found
sleepMultiplier = 6
continue
}
loop := func() error {
jd.logger.Debugf("backupDSLoop backup enabled %s", jd.tablePrefix)
backupDSRange, err := jd.getBackupDSRange()
if err != nil {
return fmt.Errorf("[JobsDB] :: Failed to get backup dataset range. Err: %w", err)
}
// check if non-empty dataset is present to back up
// else continue
sleepMultiplier = 1
if (dataSetRangeT{} == *backupDSRange) {
// sleep for more duration if no dataset is found
sleepMultiplier = 6
return nil
}

backupDS := backupDSRange.ds
backupDS := backupDSRange.ds

opPayload, err := json.Marshal(&backupDS)
jd.assertError(err)
opPayload, err := json.Marshal(&backupDS)
jd.assertError(err)

opID := jd.JournalMarkStart(backupDSOperation, opPayload)
err = jd.backupDS(ctx, backupDSRange)
if err != nil {
jd.logger.Errorf("[JobsDB] :: Failed to backup jobs table %v. Err: %v", backupDSRange.ds.JobStatusTable, err)
opID, err := jd.JournalMarkStart(backupDSOperation, opPayload)
if err != nil {
return fmt.Errorf("mark start of backup operation: %w", err)
}
err = jd.backupDS(ctx, backupDSRange)
if err != nil {
return fmt.Errorf("backup dataset: %w", err)
}
err = jd.JournalMarkDone(opID)
if err != nil {
return fmt.Errorf("mark end of backup operation: %w", err)
}

// drop dataset after successfully uploading both jobs and jobs_status to s3
opID, err = jd.JournalMarkStart(backupDropDSOperation, opPayload)
if err != nil {
return fmt.Errorf("mark start of drop backup operation: %w", err)
}
// Currently, we retry uploading a table for some time & if it fails. We only drop that table & not all `pre_drop` tables.
// So, in situation when new table creation rate is more than drop. We will still have pipe up issue.
// An easy way to fix this is, if at any point of time exponential retry fails then instead of just dropping that particular
// table drop all subsequent `pre_drop` table. As, most likely the upload of rest of the table will also fail with the same error.
err = jd.dropDS(backupDS)
if err != nil {
return fmt.Errorf(" drop dataset: %w", err)
}
err = jd.JournalMarkDone(opID)
if err != nil {
return fmt.Errorf("mark end of drop backup operation: %w", err)
}
return nil
}
if err := loop(); err != nil && ctx.Err() == nil {
if !jd.skipMaintenanceError {
panic(err)
}
jd.logger.Errorf("[JobsDB] :: Failed to backup dataset. Err: %s", err.Error())
}
jd.JournalMarkDone(opID)

// drop dataset after successfully uploading both jobs and jobs_status to s3
opID = jd.JournalMarkStart(backupDropDSOperation, opPayload)
// Currently, we retry uploading a table for some time & if it fails. We only drop that table & not all `pre_drop` tables.
// So, in situation when new table creation rate is more than drop. We will still have pipe up issue.
// An easy way to fix this is, if at any point of time exponential retry fails then instead of just dropping that particular
// table drop all subsequent `pre_drop` table. As, most likely the upload of rest of the table will also fail with the same error.
jd.mustDropDS(backupDS)
jd.JournalMarkDone(opID)
}
}

Expand Down Expand Up @@ -619,13 +647,15 @@ func (jd *HandleT) backupUploadWithExponentialBackoff(ctx context.Context, file
return output, err
}

func (jd *HandleT) getBackupDSRange() *dataSetRangeT {
func (jd *HandleT) getBackupDSRange() (*dataSetRangeT, error) {
var backupDS dataSetT
var backupDSRange dataSetRangeT

// Read the table names from PG
tableNames := mustGetAllTableNames(jd, jd.dbHandle)

tableNames, err := getAllTableNames(jd.dbHandle)
if err != nil {
return nil, fmt.Errorf("getAllTableNames: %w", err)
}
// We check for job_status because that is renamed after job
var dnumList []string
for _, t := range tableNames {
Expand All @@ -636,7 +666,7 @@ func (jd *HandleT) getBackupDSRange() *dataSetRangeT {
}
}
if len(dnumList) == 0 {
return &backupDSRange
return &backupDSRange, nil
}
jd.statPreDropTableCount.Gauge(len(dnumList))

Expand All @@ -651,14 +681,18 @@ func (jd *HandleT) getBackupDSRange() *dataSetRangeT {
var minID, maxID sql.NullInt64
jobIDSQLStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) from %q`, backupDS.JobTable)
row := jd.dbHandle.QueryRow(jobIDSQLStatement)
err := row.Scan(&minID, &maxID)
jd.assertError(err)
err = row.Scan(&minID, &maxID)
if err != nil {
return nil, fmt.Errorf("getting min and max job_id: %w", err)
}

var minCreatedAt, maxCreatedAt time.Time
jobTimeSQLStatement := fmt.Sprintf(`SELECT MIN(created_at), MAX(created_at) from %q`, backupDS.JobTable)
row = jd.dbHandle.QueryRow(jobTimeSQLStatement)
err = row.Scan(&minCreatedAt, &maxCreatedAt)
jd.assertError(err)
if err != nil {
return nil, fmt.Errorf("getting min and max created_at: %w", err)
}

backupDSRange = dataSetRangeT{
minJobID: minID.Int64,
Expand All @@ -667,5 +701,5 @@ func (jd *HandleT) getBackupDSRange() *dataSetRangeT {
endTime: maxCreatedAt.UnixNano() / int64(time.Millisecond),
ds: backupDS,
}
return &backupDSRange
return &backupDSRange, nil
}

0 comments on commit 6ad51e8

Please sign in to comment.