diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 138e00dd51..d718b6d3fe 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -476,9 +476,8 @@ type journalOpPayloadT struct { } type ParameterFilterT struct { - Name string - Value string - Optional bool + Name string + Value string } var dbInvalidJsonErrors = map[string]struct{}{ diff --git a/jobsdb/jobsdb_utils.go b/jobsdb/jobsdb_utils.go index e03cb44080..15e297acae 100644 --- a/jobsdb/jobsdb_utils.go +++ b/jobsdb/jobsdb_utils.go @@ -156,11 +156,7 @@ func constructParameterJSONQuery(alias string, parameterFilters []ParameterFilte var allKeyValues, mandatoryKeyValues, opNullConditions []string for _, parameter := range parameterFilters { allKeyValues = append(allKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value)) - if parameter.Optional { - opNullConditions = append(opNullConditions, fmt.Sprintf(`%q.parameters -> '%s' IS NULL`, alias, parameter.Name)) - } else { - mandatoryKeyValues = append(mandatoryKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value)) - } + mandatoryKeyValues = append(mandatoryKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value)) } opQuery := "" if len(opNullConditions) > 0 { diff --git a/router/batchrouter/batchrouter.go b/router/batchrouter/batchrouter.go index 88b82823fc..9770856aba 100644 --- a/router/batchrouter/batchrouter.go +++ b/router/batchrouter/batchrouter.go @@ -315,9 +315,8 @@ func (brt *HandleT) pollAsyncStatus(ctx context.Context) { parameterFilters := make([]jobsdb.ParameterFilterT, 0) for _, param := range jobsdb.CacheKeyParameterFilters { parameterFilter := jobsdb.ParameterFilterT{ - Name: param, - Value: key, - Optional: false, + Name: param, + Value: key, } parameterFilters = append(parameterFilters, parameterFilter) } @@ -1314,9 +1313,8 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc if readPerDestination { parameterFilters = []jobsdb.ParameterFilterT{ { - Name: "destination_id", - Value: batchJobs.BatchDestination.Destination.ID, - Optional: false, + Name: "destination_id", + Value: batchJobs.BatchDestination.Destination.ID, }, } } @@ -1467,9 +1465,8 @@ func (brt *HandleT) setMultipleJobStatus(asyncOutput asyncdestinationmanager.Asy parameterFilters := []jobsdb.ParameterFilterT{ { - Name: "destination_id", - Value: asyncOutput.DestinationID, - Optional: false, + Name: "destination_id", + Value: asyncOutput.DestinationID, }, } @@ -1593,9 +1590,8 @@ func (worker *workerT) constructParameterFilters(batchDest router_utils.BatchDes parameterFilters := make([]jobsdb.ParameterFilterT, 0) for _, key := range jobsdb.CacheKeyParameterFilters { parameterFilter := jobsdb.ParameterFilterT{ - Name: key, - Value: worker.getValueForParameter(batchDest, key), - Optional: false, + Name: key, + Value: worker.getValueForParameter(batchDest, key), } parameterFilters = append(parameterFilters, parameterFilter) } diff --git a/router/router.go b/router/router.go index aa8cbc22fb..b1de3863f0 100644 --- a/router/router.go +++ b/router/router.go @@ -400,8 +400,8 @@ func (worker *workerT) workerProcess() { // mark job as waiting if prev job from same user has not succeeded yet worker.rt.logger.Debugf( - "[%v Router] :: skipping processing job for userID: %v since prev failed job exists, prev id %v, current id %v", - worker.rt.destName, userID, previousFailedJobID, job.JobID, + "[%v Router] :: skipping processing job for orderKey: %v since prev failed job exists, prev id %v, current id %v", + worker.rt.destName, orderKey, previousFailedJobID, job.JobID, ) resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "blocking_id", *previousFailedJobID) resp = misc.UpdateJSONWithNewKeyVal(resp, "user_id", userID) @@ -1200,31 +1200,29 @@ func (rt *HandleT) stopWorkers() { } } -func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledUserMap map[string]struct{}) (toSendWorker *workerT) { +func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]struct{}) (toSendWorker *workerT) { if rt.backgroundCtx.Err() != nil { return } - // checking if this job can be throttled var parameters JobParametersT - userID := job.UserID - - // checking if the user is in throttledMap. If yes, returning nil. - // this check is done to maintain order. - if _, ok := throttledUserMap[userID]; ok && rt.guaranteeUserEventOrder { - rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of user:%s as user has earlier jobs in throttled map`, rt.destName, job.JobID, userID) - return nil - } - err := json.Unmarshal(job.Parameters, ¶meters) if err != nil { rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err) return } + orderKey := jobOrderKey(job.UserID, parameters.DestinationID) + + // checking if the orderKey is in throttledMap. If yes, returning nil. + // this check is done to maintain order. + if _, ok := throttledOrderKeys[orderKey]; ok { + rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of orderKey:%s as orderKey has earlier jobs in throttled map`, rt.destName, job.JobID, orderKey) + return nil + } if !rt.guaranteeUserEventOrder { // if guaranteeUserEventOrder is false, assigning worker randomly and returning here. - if rt.shouldThrottle(job, parameters, throttledUserMap) { + if rt.shouldThrottle(job, parameters) { return } toSendWorker = rt.workers[rand.Intn(rt.noOfWorkers)] // skipcq: GSC-G404 @@ -1232,16 +1230,16 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledUserMap map[string]stru } //#JobOrder (see other #JobOrder comment) - index := rt.getWorkerPartition(userID) + index := rt.getWorkerPartition(orderKey) worker := rt.workers[index] if worker.canBackoff(job) { return } - orderKey := jobOrderKey(userID, parameters.DestinationID) enter, previousFailedJobID := worker.barrier.Enter(orderKey, job.JobID) if enter { - rt.logger.Debugf("EventOrder: job %d of user %s is allowed to be processed", job.JobID, userID) - if rt.shouldThrottle(job, parameters, throttledUserMap) { + rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey) + if rt.shouldThrottle(job, parameters) { + throttledOrderKeys[orderKey] = struct{}{} worker.barrier.Leave(orderKey, job.JobID) return } @@ -1252,7 +1250,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledUserMap map[string]stru if previousFailedJobID != nil { previousFailedJobIDStr = strconv.FormatInt(*previousFailedJobID, 10) } - rt.logger.Debugf("EventOrder: job %d of user %s is blocked (previousFailedJobID: %s)", job.JobID, userID, previousFailedJobIDStr) + rt.logger.Debugf("EventOrder: job %d of orderKey %s is blocked (previousFailedJobID: %s)", job.JobID, orderKey, previousFailedJobIDStr) return nil //#EndJobOrder } @@ -1268,11 +1266,11 @@ func (worker *workerT) canBackoff(job *jobsdb.JobT) (shouldBackoff bool) { return false } -func (rt *HandleT) getWorkerPartition(userID string) int { - return misc.GetHash(userID) % rt.noOfWorkers +func (rt *HandleT) getWorkerPartition(key string) int { + return misc.GetHash(key) % rt.noOfWorkers } -func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT, throttledUserMap map[string]struct{}) ( +func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT) ( limited bool, ) { if rt.throttlerFactory == nil { @@ -1295,7 +1293,6 @@ func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT, t return false } if limited { - throttledUserMap[job.UserID] = struct{}{} rt.throttledStat.Count(1) rt.logger.Debugf( "[%v Router] :: Skipping processing of job:%d of user:%s as throttled limits exceeded", @@ -1430,7 +1427,7 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { // Update the status err := misc.RetryWithNotify(context.Background(), rt.jobsDBCommandTimeout, rt.jobdDBMaxRetries, func(ctx context.Context) error { return rt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { - err := rt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{rt.destName}, nil) + err := rt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{rt.destName}, rt.parameterFilters()) if err != nil { return fmt.Errorf("updating %s jobs statuses: %w", rt.destName, err) } @@ -1648,12 +1645,8 @@ func (rt *HandleT) generatorLoop(ctx context.Context) { func (rt *HandleT) getQueryParams(pickUpCount int) jobsdb.GetQueryParamsT { if rt.destinationId != rt.destName { return jobsdb.GetQueryParamsT{ - CustomValFilters: []string{rt.destName}, - ParameterFilters: []jobsdb.ParameterFilterT{{ - Name: "destination_id", - Value: rt.destinationId, - Optional: false, - }}, + CustomValFilters: []string{rt.destName}, + ParameterFilters: rt.parameterFilters(), IgnoreCustomValFiltersInQuery: true, PayloadSizeLimit: rt.adaptiveLimit(rt.payloadLimit), JobsLimit: pickUpCount, @@ -1666,6 +1659,16 @@ func (rt *HandleT) getQueryParams(pickUpCount int) jobsdb.GetQueryParamsT { } } +func (rt *HandleT) parameterFilters() []jobsdb.ParameterFilterT { + if rt.destinationId != rt.destName { + return []jobsdb.ParameterFilterT{{ + Name: "destination_id", + Value: rt.destinationId, + }} + } + return nil +} + func (rt *HandleT) readAndProcess() int { //#JobOrder (See comment marked #JobOrder if rt.guaranteeUserEventOrder { @@ -1713,12 +1716,12 @@ func (rt *HandleT) readAndProcess() int { var statusList []*jobsdb.JobStatusT var toProcess []workerJobT - throttledUserMap := make(map[string]struct{}) + throttledOrderKeys := make(map[string]struct{}) // Identify jobs which can be processed for iterator.HasNext() { job := iterator.Next() - w := rt.findWorker(job, throttledUserMap) + w := rt.findWorker(job, throttledOrderKeys) if w != nil { status := jobsdb.JobStatusT{ JobID: job.JobID, @@ -1744,7 +1747,7 @@ func (rt *HandleT) readAndProcess() int { // Mark the jobs as executing err := misc.RetryWithNotify(context.Background(), rt.jobsDBCommandTimeout, rt.jobdDBMaxRetries, func(ctx context.Context) error { - return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destName}, nil) + return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destName}, rt.parameterFilters()) }, sendRetryUpdateStats) if err != nil { pkgLogger.Errorf("Error occurred while marking %s jobs statuses as executing. Panicking. Err: %v", rt.destName, err) diff --git a/router/router_throttling_test.go b/router/router_throttling_test.go index 870e739aea..1cffdd7bf8 100644 --- a/router/router_throttling_test.go +++ b/router/router_throttling_test.go @@ -219,7 +219,7 @@ func Test_RouterThrottling(t *testing.T) { verifyBucket := func(buckets map[int64]int, totalEvents, rps, burst, cost int) { lowerLengthRange := (totalEvents*cost - burst) / rps - upperLengthRange := lowerLengthRange + 1 + upperLengthRange := lowerLengthRange + 2 requireLengthInRange(t, buckets, lowerLengthRange, upperLengthRange) maxEventsPerBucket := rps / cost diff --git a/services/debugger/destination/eventDeliveryStatusUploader.go b/services/debugger/destination/eventDeliveryStatusUploader.go index b13473521f..87862f0ba7 100644 --- a/services/debugger/destination/eventDeliveryStatusUploader.go +++ b/services/debugger/destination/eventDeliveryStatusUploader.go @@ -68,7 +68,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (Destinat h.uploader = debugger.New[*DeliveryStatusT](url, eventUploader) h.uploader.Start() - cacheType := cache.CacheType(config.GetInt("DestinationDebugger.cacheType", int(cache.BadgerCacheType))) + cacheType := cache.CacheType(config.GetInt("DestinationDebugger.cacheType", int(cache.MemoryCacheType))) h.eventsDeliveryCache, err = cache.New[*DeliveryStatusT](cacheType, "destination", h.log) if err != nil { return nil, err diff --git a/services/debugger/source/eventUploader.go b/services/debugger/source/eventUploader.go index 761af564c7..953ee50950 100644 --- a/services/debugger/source/eventUploader.go +++ b/services/debugger/source/eventUploader.go @@ -74,7 +74,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (SourceDe h.uploader = debugger.New[*GatewayEventBatchT](url, eventUploader) h.uploader.Start() - cacheType := cache.CacheType(config.GetInt("SourceDebugger.cacheType", int(cache.BadgerCacheType))) + cacheType := cache.CacheType(config.GetInt("SourceDebugger.cacheType", int(cache.MemoryCacheType))) h.eventsCache, err = cache.New[[]byte](cacheType, "source", h.log) if err != nil { return nil, err diff --git a/services/debugger/transformation/transformationStatusUploader.go b/services/debugger/transformation/transformationStatusUploader.go index 274767941e..807b1b4562 100644 --- a/services/debugger/transformation/transformationStatusUploader.go +++ b/services/debugger/transformation/transformationStatusUploader.go @@ -109,7 +109,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (Transfor url := fmt.Sprintf("%s/dataplane/eventTransformStatus", h.configBackendURL) transformationStatusUploader := &TransformationStatusUploader{} - cacheType := cache.CacheType(config.GetInt("TransformationDebugger.cacheType", int(cache.BadgerCacheType))) + cacheType := cache.CacheType(config.GetInt("TransformationDebugger.cacheType", int(cache.MemoryCacheType))) h.transformationCacheMap, err = cache.New[TransformationStatusT](cacheType, "transformation", h.log) if err != nil { return nil, err