From ae989cd3db3c39b08197a9238611aaf6382123ba Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Fri, 15 Sep 2023 16:06:00 +0300 Subject: [PATCH] chore(batchrouter): honour upload frequency when limitsReached if destination is failing (#3874) --- router/batchrouter/handle.go | 10 +++++++++- router/batchrouter/handle_lifecycle.go | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index 40021aff89..1328d6c595 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -114,6 +114,9 @@ type Handle struct { lastExecTimesMu sync.RWMutex lastExecTimes map[string]time.Time + failingDestinationsMu sync.RWMutex + failingDestinations map[string]bool + batchRequestsMetricMu sync.RWMutex batchRequestsMetric []batchRequestMetric @@ -242,7 +245,8 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob if batchDest, ok := destinationsMap[destID]; ok { var processJobs bool brt.lastExecTimesMu.Lock() - if limitsReached { // if limits are reached, process all jobs regardless of their upload frequency + brt.failingDestinationsMu.RLock() + if limitsReached && !brt.failingDestinations[destID] { // if limits are reached and the destination is not failing, process all jobs regardless of their upload frequency processJobs = true } else { // honour upload frequency lastExecTime := brt.lastExecTimes[destID] @@ -251,6 +255,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob brt.lastExecTimes[destID] = time.Now() } } + brt.failingDestinationsMu.RUnlock() brt.lastExecTimesMu.Unlock() if processJobs { workerJobs = append(workerJobs, &DestinationJobs{destWithSources: *batchDest, jobs: destJobs}) @@ -571,6 +576,9 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err batchReqMetric.batchRequestSuccess = 1 } brt.trackRequestMetrics(batchReqMetric) + brt.failingDestinationsMu.Lock() + brt.failingDestinations[batchJobs.Connection.Destination.ID] = batchReqMetric.batchRequestFailed > 0 + brt.failingDestinationsMu.Unlock() var statusList []*jobsdb.JobStatusT if isWarehouse && notifyWarehouseErr { diff --git a/router/batchrouter/handle_lifecycle.go b/router/batchrouter/handle_lifecycle.go index 08b7dbcc41..44174c1904 100644 --- a/router/batchrouter/handle_lifecycle.go +++ b/router/batchrouter/handle_lifecycle.go @@ -107,6 +107,7 @@ func (brt *Handle) Setup( brt.encounteredMergeRuleMap = map[string]map[string]bool{} brt.uploadIntervalMap = map[string]time.Duration{} brt.lastExecTimes = map[string]time.Time{} + brt.failingDestinations = map[string]bool{} brt.dateFormatProvider = &storageDateFormatProvider{dateFormatsCache: make(map[string]string)} diagnosisTickerTime := config.GetDurationVar(600, time.Second, "Diagnostics.batchRouterTimePeriod", "Diagnostics.batchRouterTimePeriodInS") brt.diagnosisTicker = time.NewTicker(diagnosisTickerTime)