Skip to content

Commit

Permalink
chore(batchrouter): honour upload frequency when limitsReached if des…
Browse files Browse the repository at this point in the history
…tination is failing (#3874)
  • Loading branch information
atzoum committed Sep 15, 2023
1 parent 12f4d83 commit ae989cd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
10 changes: 9 additions & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type Handle struct {
lastExecTimesMu sync.RWMutex
lastExecTimes map[string]time.Time

failingDestinationsMu sync.RWMutex
failingDestinations map[string]bool

batchRequestsMetricMu sync.RWMutex
batchRequestsMetric []batchRequestMetric

Expand Down Expand Up @@ -242,7 +245,8 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
if batchDest, ok := destinationsMap[destID]; ok {
var processJobs bool
brt.lastExecTimesMu.Lock()
if limitsReached { // if limits are reached, process all jobs regardless of their upload frequency
brt.failingDestinationsMu.RLock()
if limitsReached && !brt.failingDestinations[destID] { // if limits are reached and the destination is not failing, process all jobs regardless of their upload frequency
processJobs = true
} else { // honour upload frequency
lastExecTime := brt.lastExecTimes[destID]
Expand All @@ -251,6 +255,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
brt.lastExecTimes[destID] = time.Now()
}
}
brt.failingDestinationsMu.RUnlock()
brt.lastExecTimesMu.Unlock()
if processJobs {
workerJobs = append(workerJobs, &DestinationJobs{destWithSources: *batchDest, jobs: destJobs})
Expand Down Expand Up @@ -571,6 +576,9 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
batchReqMetric.batchRequestSuccess = 1
}
brt.trackRequestMetrics(batchReqMetric)
brt.failingDestinationsMu.Lock()
brt.failingDestinations[batchJobs.Connection.Destination.ID] = batchReqMetric.batchRequestFailed > 0
brt.failingDestinationsMu.Unlock()
var statusList []*jobsdb.JobStatusT

if isWarehouse && notifyWarehouseErr {
Expand Down
1 change: 1 addition & 0 deletions router/batchrouter/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (brt *Handle) Setup(
brt.encounteredMergeRuleMap = map[string]map[string]bool{}
brt.uploadIntervalMap = map[string]time.Duration{}
brt.lastExecTimes = map[string]time.Time{}
brt.failingDestinations = map[string]bool{}
brt.dateFormatProvider = &storageDateFormatProvider{dateFormatsCache: make(map[string]string)}
diagnosisTickerTime := config.GetDurationVar(600, time.Second, "Diagnostics.batchRouterTimePeriod", "Diagnostics.batchRouterTimePeriodInS")
brt.diagnosisTicker = time.NewTicker(diagnosisTickerTime)
Expand Down

0 comments on commit ae989cd

Please sign in to comment.