Skip to content

Commit

Permalink
fix: job ordering inconsistencies with router destination isolation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 23, 2023
1 parent 0742fd7 commit a77c382
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 57 deletions.
5 changes: 2 additions & 3 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,8 @@ type journalOpPayloadT struct {
}

type ParameterFilterT struct {
Name string
Value string
Optional bool
Name string
Value string
}

var dbInvalidJsonErrors = map[string]struct{}{
Expand Down
6 changes: 1 addition & 5 deletions jobsdb/jobsdb_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,7 @@ func constructParameterJSONQuery(alias string, parameterFilters []ParameterFilte
var allKeyValues, mandatoryKeyValues, opNullConditions []string
for _, parameter := range parameterFilters {
allKeyValues = append(allKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value))
if parameter.Optional {
opNullConditions = append(opNullConditions, fmt.Sprintf(`%q.parameters -> '%s' IS NULL`, alias, parameter.Name))
} else {
mandatoryKeyValues = append(mandatoryKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value))
}
mandatoryKeyValues = append(mandatoryKeyValues, fmt.Sprintf(`%q:%q`, parameter.Name, parameter.Value))
}
opQuery := ""
if len(opNullConditions) > 0 {
Expand Down
20 changes: 8 additions & 12 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,8 @@ func (brt *HandleT) pollAsyncStatus(ctx context.Context) {
parameterFilters := make([]jobsdb.ParameterFilterT, 0)
for _, param := range jobsdb.CacheKeyParameterFilters {
parameterFilter := jobsdb.ParameterFilterT{
Name: param,
Value: key,
Optional: false,
Name: param,
Value: key,
}
parameterFilters = append(parameterFilters, parameterFilter)
}
Expand Down Expand Up @@ -1314,9 +1313,8 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc
if readPerDestination {
parameterFilters = []jobsdb.ParameterFilterT{
{
Name: "destination_id",
Value: batchJobs.BatchDestination.Destination.ID,
Optional: false,
Name: "destination_id",
Value: batchJobs.BatchDestination.Destination.ID,
},
}
}
Expand Down Expand Up @@ -1467,9 +1465,8 @@ func (brt *HandleT) setMultipleJobStatus(asyncOutput asyncdestinationmanager.Asy

parameterFilters := []jobsdb.ParameterFilterT{
{
Name: "destination_id",
Value: asyncOutput.DestinationID,
Optional: false,
Name: "destination_id",
Value: asyncOutput.DestinationID,
},
}

Expand Down Expand Up @@ -1593,9 +1590,8 @@ func (worker *workerT) constructParameterFilters(batchDest router_utils.BatchDes
parameterFilters := make([]jobsdb.ParameterFilterT, 0)
for _, key := range jobsdb.CacheKeyParameterFilters {
parameterFilter := jobsdb.ParameterFilterT{
Name: key,
Value: worker.getValueForParameter(batchDest, key),
Optional: false,
Name: key,
Value: worker.getValueForParameter(batchDest, key),
}
parameterFilters = append(parameterFilters, parameterFilter)
}
Expand Down
69 changes: 36 additions & 33 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ func (worker *workerT) workerProcess() {

// mark job as waiting if prev job from same user has not succeeded yet
worker.rt.logger.Debugf(
"[%v Router] :: skipping processing job for userID: %v since prev failed job exists, prev id %v, current id %v",
worker.rt.destName, userID, previousFailedJobID, job.JobID,
"[%v Router] :: skipping processing job for orderKey: %v since prev failed job exists, prev id %v, current id %v",
worker.rt.destName, orderKey, previousFailedJobID, job.JobID,
)
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "blocking_id", *previousFailedJobID)
resp = misc.UpdateJSONWithNewKeyVal(resp, "user_id", userID)
Expand Down Expand Up @@ -1200,48 +1200,46 @@ func (rt *HandleT) stopWorkers() {
}
}

func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledUserMap map[string]struct{}) (toSendWorker *workerT) {
func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]struct{}) (toSendWorker *workerT) {
if rt.backgroundCtx.Err() != nil {
return
}

// checking if this job can be throttled
var parameters JobParametersT
userID := job.UserID

// checking if the user is in throttledMap. If yes, returning nil.
// this check is done to maintain order.
if _, ok := throttledUserMap[userID]; ok && rt.guaranteeUserEventOrder {
rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of user:%s as user has earlier jobs in throttled map`, rt.destName, job.JobID, userID)
return nil
}

err := json.Unmarshal(job.Parameters, &parameters)
if err != nil {
rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err)
return
}
orderKey := jobOrderKey(job.UserID, parameters.DestinationID)

// checking if the orderKey is in throttledMap. If yes, returning nil.
// this check is done to maintain order.
if _, ok := throttledOrderKeys[orderKey]; ok {
rt.logger.Debugf(`[%v Router] :: Skipping processing of job:%d of orderKey:%s as orderKey has earlier jobs in throttled map`, rt.destName, job.JobID, orderKey)
return nil
}

if !rt.guaranteeUserEventOrder {
// if guaranteeUserEventOrder is false, assigning worker randomly and returning here.
if rt.shouldThrottle(job, parameters, throttledUserMap) {
if rt.shouldThrottle(job, parameters) {
return
}
toSendWorker = rt.workers[rand.Intn(rt.noOfWorkers)] // skipcq: GSC-G404
return
}

//#JobOrder (see other #JobOrder comment)
index := rt.getWorkerPartition(userID)
index := rt.getWorkerPartition(orderKey)
worker := rt.workers[index]
if worker.canBackoff(job) {
return
}
orderKey := jobOrderKey(userID, parameters.DestinationID)
enter, previousFailedJobID := worker.barrier.Enter(orderKey, job.JobID)
if enter {
rt.logger.Debugf("EventOrder: job %d of user %s is allowed to be processed", job.JobID, userID)
if rt.shouldThrottle(job, parameters, throttledUserMap) {
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
if rt.shouldThrottle(job, parameters) {
throttledOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
return
}
Expand All @@ -1252,7 +1250,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledUserMap map[string]stru
if previousFailedJobID != nil {
previousFailedJobIDStr = strconv.FormatInt(*previousFailedJobID, 10)
}
rt.logger.Debugf("EventOrder: job %d of user %s is blocked (previousFailedJobID: %s)", job.JobID, userID, previousFailedJobIDStr)
rt.logger.Debugf("EventOrder: job %d of orderKey %s is blocked (previousFailedJobID: %s)", job.JobID, orderKey, previousFailedJobIDStr)
return nil
//#EndJobOrder
}
Expand All @@ -1268,11 +1266,11 @@ func (worker *workerT) canBackoff(job *jobsdb.JobT) (shouldBackoff bool) {
return false
}

func (rt *HandleT) getWorkerPartition(userID string) int {
return misc.GetHash(userID) % rt.noOfWorkers
func (rt *HandleT) getWorkerPartition(key string) int {
return misc.GetHash(key) % rt.noOfWorkers
}

func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT, throttledUserMap map[string]struct{}) (
func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT) (
limited bool,
) {
if rt.throttlerFactory == nil {
Expand All @@ -1295,7 +1293,6 @@ func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT, t
return false
}
if limited {
throttledUserMap[job.UserID] = struct{}{}
rt.throttledStat.Count(1)
rt.logger.Debugf(
"[%v Router] :: Skipping processing of job:%d of user:%s as throttled limits exceeded",
Expand Down Expand Up @@ -1430,7 +1427,7 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
// Update the status
err := misc.RetryWithNotify(context.Background(), rt.jobsDBCommandTimeout, rt.jobdDBMaxRetries, func(ctx context.Context) error {
return rt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err := rt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{rt.destName}, nil)
err := rt.jobsDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{rt.destName}, rt.parameterFilters())
if err != nil {
return fmt.Errorf("updating %s jobs statuses: %w", rt.destName, err)
}
Expand Down Expand Up @@ -1648,12 +1645,8 @@ func (rt *HandleT) generatorLoop(ctx context.Context) {
func (rt *HandleT) getQueryParams(pickUpCount int) jobsdb.GetQueryParamsT {
if rt.destinationId != rt.destName {
return jobsdb.GetQueryParamsT{
CustomValFilters: []string{rt.destName},
ParameterFilters: []jobsdb.ParameterFilterT{{
Name: "destination_id",
Value: rt.destinationId,
Optional: false,
}},
CustomValFilters: []string{rt.destName},
ParameterFilters: rt.parameterFilters(),
IgnoreCustomValFiltersInQuery: true,
PayloadSizeLimit: rt.adaptiveLimit(rt.payloadLimit),
JobsLimit: pickUpCount,
Expand All @@ -1666,6 +1659,16 @@ func (rt *HandleT) getQueryParams(pickUpCount int) jobsdb.GetQueryParamsT {
}
}

func (rt *HandleT) parameterFilters() []jobsdb.ParameterFilterT {
if rt.destinationId != rt.destName {
return []jobsdb.ParameterFilterT{{
Name: "destination_id",
Value: rt.destinationId,
}}
}
return nil
}

func (rt *HandleT) readAndProcess() int {
//#JobOrder (See comment marked #JobOrder
if rt.guaranteeUserEventOrder {
Expand Down Expand Up @@ -1713,12 +1716,12 @@ func (rt *HandleT) readAndProcess() int {

var statusList []*jobsdb.JobStatusT
var toProcess []workerJobT
throttledUserMap := make(map[string]struct{})
throttledOrderKeys := make(map[string]struct{})

// Identify jobs which can be processed
for iterator.HasNext() {
job := iterator.Next()
w := rt.findWorker(job, throttledUserMap)
w := rt.findWorker(job, throttledOrderKeys)
if w != nil {
status := jobsdb.JobStatusT{
JobID: job.JobID,
Expand All @@ -1744,7 +1747,7 @@ func (rt *HandleT) readAndProcess() int {

// Mark the jobs as executing
err := misc.RetryWithNotify(context.Background(), rt.jobsDBCommandTimeout, rt.jobdDBMaxRetries, func(ctx context.Context) error {
return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destName}, nil)
return rt.jobsDB.UpdateJobStatus(ctx, statusList, []string{rt.destName}, rt.parameterFilters())
}, sendRetryUpdateStats)
if err != nil {
pkgLogger.Errorf("Error occurred while marking %s jobs statuses as executing. Panicking. Err: %v", rt.destName, err)
Expand Down
2 changes: 1 addition & 1 deletion router/router_throttling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func Test_RouterThrottling(t *testing.T) {

verifyBucket := func(buckets map[int64]int, totalEvents, rps, burst, cost int) {
lowerLengthRange := (totalEvents*cost - burst) / rps
upperLengthRange := lowerLengthRange + 1
upperLengthRange := lowerLengthRange + 2
requireLengthInRange(t, buckets, lowerLengthRange, upperLengthRange)

maxEventsPerBucket := rps / cost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (Destinat
h.uploader = debugger.New[*DeliveryStatusT](url, eventUploader)
h.uploader.Start()

cacheType := cache.CacheType(config.GetInt("DestinationDebugger.cacheType", int(cache.BadgerCacheType)))
cacheType := cache.CacheType(config.GetInt("DestinationDebugger.cacheType", int(cache.MemoryCacheType)))
h.eventsDeliveryCache, err = cache.New[*DeliveryStatusT](cacheType, "destination", h.log)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion services/debugger/source/eventUploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (SourceDe
h.uploader = debugger.New[*GatewayEventBatchT](url, eventUploader)
h.uploader.Start()

cacheType := cache.CacheType(config.GetInt("SourceDebugger.cacheType", int(cache.BadgerCacheType)))
cacheType := cache.CacheType(config.GetInt("SourceDebugger.cacheType", int(cache.MemoryCacheType)))
h.eventsCache, err = cache.New[[]byte](cacheType, "source", h.log)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (Transfor
url := fmt.Sprintf("%s/dataplane/eventTransformStatus", h.configBackendURL)
transformationStatusUploader := &TransformationStatusUploader{}

cacheType := cache.CacheType(config.GetInt("TransformationDebugger.cacheType", int(cache.BadgerCacheType)))
cacheType := cache.CacheType(config.GetInt("TransformationDebugger.cacheType", int(cache.MemoryCacheType)))
h.transformationCacheMap, err = cache.New[TransformationStatusT](cacheType, "transformation", h.log)
if err != nil {
return nil, err
Expand Down

0 comments on commit a77c382

Please sign in to comment.