Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 5, 2023
2 parents dca3c26 + 9e7f117 commit a6e3e44
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/allisson/go-pglock/v2 v2.0.1
github.com/apache/pulsar-client-go v0.10.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.44.294
github.com/aws/aws-sdk-go v1.44.295
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.294 h1:3x7GaEth+pDU9HwFcAU0awZlEix5CEdyIZvV08SlHa8=
github.com/aws/aws-sdk-go v1.44.294/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.295 h1:SGjU1+MqttXfRiWHD6WU0DRhaanJgAFY+xIhEaugV8Y=
github.com/aws/aws-sdk-go v1.44.295/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2 v1.17.7 h1:CLSjnhJSTSogvqUGhIC6LqFKATMRexcxLZ0i/Nzk9Eg=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
Expand Down
16 changes: 15 additions & 1 deletion router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -233,7 +234,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
stats.Default.NewTaggedStat("router_iterator_stats_discarded_job_count", stats.CountType, stats.Tags{"destType": rt.destType, "partition": partition, "reason": err.Error()}).Increment()
iterator.Discard(job)
discardedCount++
if rt.isolationStrategy.StopIteration(err) {
if rt.stopIteration(err) {
break
}
}
Expand All @@ -257,6 +258,19 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
return
}

func (rt *Handle) stopIteration(err error) bool {
// if the context is cancelled, we can stop iteration
if errors.Is(err, types.ErrContextCancelled) {
return true
}
// if we are not guaranteeing user event order, we can stop iteration if there are no more slots available
if !rt.guaranteeUserEventOrder && errors.Is(err, types.ErrWorkerNoSlot) {
return true
}
// delegate to the isolation strategy for the final decision
return rt.isolationStrategy.StopIteration(err)
}

// commitStatusList commits the status of the jobs to the jobsDB
func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
reportMetrics := make([]*utilTypes.PUReportedMetric, 0)
Expand Down

0 comments on commit a6e3e44

Please sign in to comment.