Skip to content

Commit

Permalink
chore: use new reloadable config api in router and batchrouter (#3871)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 15, 2023
1 parent d254bc2 commit 12f4d83
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 256 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
github.com/rs/cors v1.9.0
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.15.9
github.com/rudderlabs/rudder-go-kit v0.15.11
github.com/rudderlabs/sql-tunnels v0.1.4
github.com/samber/lo v1.38.1
github.com/segmentio/kafka-go v0.4.42
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1774,8 +1774,8 @@ github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xb
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.15.9 h1:i1dUUL1JLAOOn+2RgVq0K6UnTn6X4OZMTiZPZZBKmTQ=
github.com/rudderlabs/rudder-go-kit v0.15.9/go.mod h1:W7b3rft7Kt8TvNd4gguw5ZB9Hu4M1k60EGLv91+N2zM=
github.com/rudderlabs/rudder-go-kit v0.15.11 h1:pvZMQWgCVeUmCJVWIkJ2hIiDEjVDYg0Tx5VTJ6X4feM=
github.com/rudderlabs/rudder-go-kit v0.15.11/go.mod h1:W7b3rft7Kt8TvNd4gguw5ZB9Hu4M1k60EGLv91+N2zM=
github.com/rudderlabs/sql-tunnels v0.1.4 h1:snnkUItx3nRCPGSouibkYzBpOcEiWhCVjRY95A0eMuk=
github.com/rudderlabs/sql-tunnels v0.1.4/go.mod h1:C6O0M6C3V+pdVwjy3UvyPd+d5SeRvbBQfApm+67qNnI=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
Expand Down
11 changes: 5 additions & 6 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/filemanager/mock_filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
router_utils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("BatchRouter", func() {

BeforeEach(func() {
config.Reset()
router_utils.JobRetention = time.Duration(175200) * time.Hour // 20 Years(20*365*24)
config.Set("Router.jobRetention", "175200h") // 20 Years(20*365*24)
c = &testContext{}
c.Setup()
})
Expand Down Expand Up @@ -178,7 +178,6 @@ var _ = Describe("BatchRouter", func() {
batchrouter := &Handle{}
batchrouter.Setup(s3DestinationDefinition.Name, c.mockBackendConfig, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, transientsource.NewEmptyService(), rsources.NewNoOpService(), destinationdebugger.NewNoOpService())

batchrouter.readPerDestination = false
batchrouter.fileManagerFactory = c.mockFileManagerFactory

c.mockFileManager.EXPECT().Upload(gomock.Any(), gomock.Any(), gomock.Any()).Return(filemanager.UploadedFile{Location: "local", ObjectName: "file"}, nil)
Expand Down Expand Up @@ -236,7 +235,7 @@ var _ = Describe("BatchRouter", func() {

payloadLimit := batchrouter.payloadLimit
var getJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, states []string, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
c.mockBatchRouterJobsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit.Load()}).DoAndReturn(func(ctx context.Context, states []string, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
var res jobsdb.JobsResult
if !getJobsListCalled {
getJobsListCalled = true
Expand Down Expand Up @@ -267,8 +266,8 @@ var _ = Describe("BatchRouter", func() {
c.mockBatchRouterJobsDB.EXPECT().JournalDeleteEntry(gomock.Any()).Times(1)

<-batchrouter.backendConfigInitialized
batchrouter.minIdleSleep = time.Microsecond
batchrouter.uploadFreq = time.Microsecond
batchrouter.minIdleSleep = config.GetReloadableDurationVar(1, time.Microsecond, rand.UniqueString(10))
batchrouter.uploadFreq = config.GetReloadableDurationVar(1, time.Microsecond, rand.UniqueString(10))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
Expand Down
74 changes: 35 additions & 39 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,24 @@ type Handle struct {

maxEventsInABatch int
maxPayloadSizeInBytes int
maxFailedCountForJob int
asyncUploadTimeout time.Duration
retryTimeWindow time.Duration
maxFailedCountForJob *config.Reloadable[int]
asyncUploadTimeout *config.Reloadable[time.Duration]
retryTimeWindow *config.Reloadable[time.Duration]
reportingEnabled bool
jobQueryBatchSize int
pollStatusLoopSleep time.Duration
payloadLimit int64
jobsDBCommandTimeout time.Duration
jobdDBQueryRequestTimeout time.Duration
jobdDBMaxRetries int
minIdleSleep time.Duration
uploadFreq time.Duration
forceHonorUploadFrequency bool
readPerDestination bool
jobQueryBatchSize *config.Reloadable[int]
pollStatusLoopSleep *config.Reloadable[time.Duration]
payloadLimit *config.Reloadable[int64]
jobsDBCommandTimeout *config.Reloadable[time.Duration]
jobdDBQueryRequestTimeout *config.Reloadable[time.Duration]
jobdDBMaxRetries *config.Reloadable[int]
minIdleSleep *config.Reloadable[time.Duration]
uploadFreq *config.Reloadable[time.Duration]
disableEgress bool
toAbortDestinationIDs string
warehouseServiceMaxRetryTime time.Duration
toAbortDestinationIDs *config.Reloadable[string]
warehouseServiceMaxRetryTime *config.Reloadable[time.Duration]
transformerURL string
datePrefixOverride string
customDatePrefix string
datePrefixOverride *config.Reloadable[string]
customDatePrefix *config.Reloadable[string]

// state

Expand Down Expand Up @@ -148,7 +146,7 @@ func (brt *Handle) mainLoop(ctx context.Context) {
for _, partition := range brt.activePartitions(ctx) {
pool.PingWorker(partition)
}
mainLoopSleep = brt.uploadFreq
mainLoopSleep = brt.uploadFreq.Load()
}
}
}
Expand Down Expand Up @@ -177,7 +175,6 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
destinationsMap := brt.destinationsMap
brt.configSubscriberMu.RUnlock()
var jobs []*jobsdb.JobT
limit := brt.jobQueryBatchSize

var firstJob *jobsdb.JobT
var lastJob *jobsdb.JobT
Expand All @@ -186,14 +183,14 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
queryStart := time.Now()
queryParams := jobsdb.GetQueryParams{
CustomValFilters: []string{brt.destType},
JobsLimit: limit,
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit),
JobsLimit: brt.jobQueryBatchSize.Load(),
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit.Load()),
}
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
Expand All @@ -203,7 +200,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
Expand All @@ -217,7 +214,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
Expand Down Expand Up @@ -245,11 +242,11 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
if batchDest, ok := destinationsMap[destID]; ok {
var processJobs bool
brt.lastExecTimesMu.Lock()
if limitsReached && !brt.forceHonorUploadFrequency { // if limits are reached, process all jobs regardless of their upload frequency
if limitsReached { // if limits are reached, process all jobs regardless of their upload frequency
processJobs = true
} else { // honour upload frequency
lastExecTime := brt.lastExecTimes[destID]
if lastExecTime.IsZero() || time.Since(lastExecTime) >= brt.uploadFreq {
if lastExecTime.IsZero() || time.Since(lastExecTime) >= brt.uploadFreq.Load() {
processJobs = true
brt.lastExecTimes[destID] = time.Now()
}
Expand Down Expand Up @@ -406,8 +403,8 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
}

var datePrefixLayout string
if brt.datePrefixOverride != "" {
datePrefixLayout = brt.datePrefixOverride
if brt.datePrefixOverride.Load() != "" {
datePrefixLayout = brt.datePrefixOverride.Load()
} else {
dateFormat, _ := brt.dateFormatProvider.GetFormat(brt.logger, uploader, batchJobs.Connection, folderName)
datePrefixLayout = dateFormat
Expand All @@ -420,7 +417,7 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
default:
datePrefixLayout = time.Now().Format("2006-01-02")
}
keyPrefixes := []string{folderName, batchJobs.Connection.Source.ID, brt.customDatePrefix + datePrefixLayout}
keyPrefixes := []string{folderName, batchJobs.Connection.Source.ID, brt.customDatePrefix.Load() + datePrefixLayout}

_, fileName := filepath.Split(gzipFilePath)
var (
Expand Down Expand Up @@ -622,8 +619,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
timeElapsed := time.Since(firstAttemptedAt)
switch jobState {
case jobsdb.Failed.State:
if !notifyWarehouseErr && timeElapsed > brt.retryTimeWindow && job.LastJobStatus.AttemptNum >= brt.
maxFailedCountForJob {
if !notifyWarehouseErr && timeElapsed > brt.retryTimeWindow.Load() && job.LastJobStatus.AttemptNum >= brt.maxFailedCountForJob.Load() {
job.Parameters = misc.UpdateJSONWithNewKeyVal(job.Parameters, "stage", "batch_router")
job.Parameters = misc.UpdateJSONWithNewKeyVal(job.Parameters, "reason", errOccurred.Error())
abortedEvents = append(abortedEvents, job)
Expand All @@ -632,7 +628,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
if notifyWarehouseErr && isWarehouse {
// change job state to abort state after warehouse service is continuously failing more than warehouseServiceMaxRetryTimeinHr time
brt.warehouseServiceFailedTimeMu.RLock()
if time.Since(brt.warehouseServiceFailedTime) > brt.warehouseServiceMaxRetryTime {
if time.Since(brt.warehouseServiceFailedTime) > brt.warehouseServiceMaxRetryTime.Load() {
job.Parameters = misc.UpdateJSONWithNewKeyVal(job.Parameters, "stage", "batch_router")
job.Parameters = misc.UpdateJSONWithNewKeyVal(job.Parameters, "reason", errOccurred.Error())
abortedEvents = append(abortedEvents, job)
Expand Down Expand Up @@ -722,7 +718,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err

// Store the aborted jobs to errorDB
if abortedEvents != nil {
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.errorDB.Store(ctx, abortedEvents)
}, brt.sendRetryStoreStats)
if err != nil {
Expand Down Expand Up @@ -759,7 +755,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
// REPORTING - END

// Mark the status of the jobs
err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) error {
err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err = brt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{brt.destType}, parameterFilters)
if err != nil {
Expand Down Expand Up @@ -791,17 +787,17 @@ func (brt *Handle) uploadInterval(destinationConfig map[string]interface{}) time
uploadInterval, ok := destinationConfig["uploadInterval"]
if !ok {
brt.logger.Debugf("BRT: uploadInterval not found in destination config, falling back to default: %s", brt.asyncUploadTimeout)
return brt.asyncUploadTimeout
return brt.asyncUploadTimeout.Load()
}
dur, ok := uploadInterval.(string)
if !ok {
brt.logger.Warnf("BRT: not found string type uploadInterval, falling back to default: %s", brt.asyncUploadTimeout)
return brt.asyncUploadTimeout
return brt.asyncUploadTimeout.Load()
}
parsedTime, err := strconv.ParseInt(dur, 10, 64)
if err != nil {
brt.logger.Warnf("BRT: Couldn't parseint uploadInterval, falling back to default: %s", brt.asyncUploadTimeout)
return brt.asyncUploadTimeout
return brt.asyncUploadTimeout.Load()
}
return time.Duration(parsedTime * int64(time.Minute))
}
Expand All @@ -812,10 +808,10 @@ func (brt *Handle) skipFetchingJobs(partition string) bool {
queryParams := jobsdb.GetQueryParams{
CustomValFilters: []string{brt.destType},
JobsLimit: 1,
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit),
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit.Load()),
}
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
importingList, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
importingList, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetImporting(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (

func (brt *Handle) getImportingJobs(ctx context.Context, destinationID string, limit int) (jobsdb.JobsResult, error) {
parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destinationID}}
return misc.QueryWithRetriesAndNotify(ctx, brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return misc.QueryWithRetriesAndNotify(ctx, brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetImporting(
ctx,
jobsdb.GetQueryParams{
CustomValFilters: []string{brt.destType},
JobsLimit: limit,
ParameterFilters: parameterFilters,
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit),
PayloadSizeLimit: brt.adaptiveLimit(brt.payloadLimit.Load()),
},
)
}, brt.sendQueryRetryStats)
Expand All @@ -44,7 +44,7 @@ func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string,
reportMetrics := brt.getReportMetrics(statusList, brt.getParamertsFromJobs(allJobs))

parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destinationID}}
return misc.RetryWithNotify(ctx, brt.jobsDBCommandTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) error {
return misc.RetryWithNotify(ctx, brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{brt.destType}, parameterFilters)
if err != nil {
Expand Down Expand Up @@ -115,7 +115,7 @@ func (brt *Handle) prepareJobStatusList(importingList []*jobsdb.JobT, defaultSta

if defaultStatus.JobState == jobsdb.Failed.State {
timeElapsed := time.Since(firstAttemptedAt)
if timeElapsed > brt.retryTimeWindow && job.LastJobStatus.AttemptNum >= brt.maxFailedCountForJob {
if timeElapsed > brt.retryTimeWindow.Load() && job.LastJobStatus.AttemptNum >= brt.maxFailedCountForJob.Load() {
status.JobState = jobsdb.Aborted.State
abortedJobsList = append(abortedJobsList, job)
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func (brt *Handle) updatePollStatusToDB(ctx context.Context, destinationID strin
brt.asyncSuccessfulJobCount.Count(len(statusList) - len(abortedJobs))
brt.asyncAbortedJobCount.Count(len(abortedJobs))
if len(abortedJobs) > 0 {
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.errorDB.Store(ctx, abortedJobs)
}, brt.sendRetryStoreStats)
if err != nil {
Expand Down Expand Up @@ -255,7 +255,7 @@ func (brt *Handle) pollAsyncStatus(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(brt.pollStatusLoopSleep):
case <-time.After(brt.pollStatusLoopSleep.Load()):
brt.configSubscriberMu.RLock()
destinationsMap := brt.destinationsMap
brt.configSubscriberMu.RUnlock()
Expand Down Expand Up @@ -633,7 +633,7 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}

timeElapsed := time.Since(firstAttemptedAts[jobId])
if timeElapsed > brt.retryTimeWindow && attemptNums[jobId] >= brt.maxFailedCountForJob {
if timeElapsed > brt.retryTimeWindow.Load() && attemptNums[jobId] >= brt.maxFailedCountForJob.Load() {
status.JobState = jobsdb.Aborted.State
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId]))
}
Expand Down Expand Up @@ -673,7 +673,7 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
reportMetrics := brt.getReportMetrics(statusList, originalJobParameters)

// Mark the status of the jobs
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{brt.destType}, parameterFilters)
if err != nil {
Expand Down
Loading

0 comments on commit 12f4d83

Please sign in to comment.