Skip to content

Commit

Permalink
chore(router): include runtime information during event ordering erro…
Browse files Browse the repository at this point in the history
…rs (#2421)

To help with debugging router's complex codebase when event ordering guarantees are being violated
  • Loading branch information
atzoum committed Sep 14, 2022
1 parent 44f3786 commit 12d3e59
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 35 deletions.
77 changes: 51 additions & 26 deletions router/internal/eventorder/eventorder.go
Expand Up @@ -11,12 +11,32 @@ import (

var ErrUnsupportedState = errors.New("unsupported state")

type OptFn func(b *Barrier)

// WithMetadata includes the provided metadata in the error messages
func WithMetadata(metadata map[string]string) OptFn {
return func(b *Barrier) {
b.metadata = metadata
}
}

// WithConcurrencyLimit sets the maximum number of concurrent jobs for
// a given user when the limiter is enabled
func WithConcurrencyLimit(abortConcurrencyLimit int) OptFn {
return func(b *Barrier) {
b.concurrencyLimit = abortConcurrencyLimit
}
}

// NewBarrier creates a new properly initialized Barrier
func NewBarrier(abortConcurrencyLimit int) *Barrier {
return &Barrier{
concurrencyLimit: abortConcurrencyLimit,
barriers: make(map[string]*barrierInfo),
func NewBarrier(fns ...OptFn) *Barrier {
b := &Barrier{
barriers: make(map[string]*barrierInfo),
}
for _, fn := range fns {
fn(b)
}
return b
}

// Barrier is an abstraction for applying event ordering guarantees in the router.
Expand All @@ -36,6 +56,7 @@ type Barrier struct {
mu sync.RWMutex // mutex to synchronize concurrent access to the barrier's methods
queue []command
barriers map[string]*barrierInfo
metadata map[string]string
}

// Enter the barrier for this userID and jobID. If there is not already a barrier for this userID
Expand All @@ -52,6 +73,9 @@ func (b *Barrier) Enter(userID string, jobID int64) (accepted bool, previousFail
// if there is a failed job in the barrier, only this job can enter the barrier
if barrier.failedJobID != nil {
failedJob := *barrier.failedJobID
if failedJob > jobID {
panic(fmt.Errorf("detected illegal job sequence during barrier enter %+v: userID %q, previousFailedJob:%d > jobID:%d", b.metadata, userID, failedJob, jobID))
}
return jobID == failedJob, &failedJob
}

Expand Down Expand Up @@ -85,7 +109,7 @@ func (b *Barrier) Wait(userID string, jobID int64) (wait bool, previousFailedJob
if barrier.failedJobID != nil {
failedJob := *barrier.failedJobID
if failedJob > jobID {
panic(fmt.Errorf("previousFailedJob:%d > jobID:%d", failedJob, jobID))
panic(fmt.Errorf("detected illegal job sequence during barrier wait %+v: userID %q, previousFailedJob:%d > jobID:%d", b.metadata, userID, failedJob, jobID))
}
return jobID > failedJob, &failedJob // wait if this is not the failed job
}
Expand Down Expand Up @@ -115,10 +139,10 @@ func (b *Barrier) StateChanged(userID string, jobID int64, state string) error {
return ErrUnsupportedState
}

if command.enqueue(b.barriers) {
if command.enqueue(b) {
b.queue = append(b.queue, command)
} else {
command.execute(b.barriers)
command.execute(b)
}
return nil
}
Expand All @@ -128,7 +152,7 @@ func (b *Barrier) Sync() int {
b.mu.Lock()
defer b.mu.Unlock()
for _, c := range b.queue {
c.execute(b.barriers)
c.execute(b)
}
flushed := len(b.queue)
b.queue = nil
Expand All @@ -147,7 +171,7 @@ func (b *Barrier) String() string {
var sb strings.Builder
b.mu.RLock()
defer b.mu.RUnlock()
sb.WriteString("Barrier{[")
sb.WriteString(fmt.Sprintf("Barrier{%+v[", b.metadata))
for userID, barrier := range b.barriers {
failedJobID := "<nil>"
if barrier.failedJobID != nil {
Expand All @@ -166,8 +190,8 @@ type barrierInfo struct {
}

type command interface {
enqueue(barriers map[string]*barrierInfo) bool
execute(barriers map[string]*barrierInfo)
enqueue(b *Barrier) bool
execute(b *Barrier)
}

type cmd struct {
Expand All @@ -176,17 +200,17 @@ type cmd struct {
}

// default behaviour is to try and remove the jobID from the concurrent jobs map
func (c *cmd) execute(barriers map[string]*barrierInfo) {
if barrier, ok := barriers[c.userID]; ok {
func (c *cmd) execute(b *Barrier) {
if barrier, ok := b.barriers[c.userID]; ok {
barrier.mu.Lock()
defer barrier.mu.Unlock()
delete(barrier.concurrencyLimiter, c.jobID)
}
}

// default behaviour is to enqueue the command if a barrier for this userID already exists
func (c *cmd) enqueue(barriers map[string]*barrierInfo) bool {
_, ok := barriers[c.userID]
func (c *cmd) enqueue(b *Barrier) bool {
_, ok := b.barriers[c.userID]
return ok
}

Expand All @@ -196,26 +220,27 @@ type jobFailedCommand struct {
}

// If no failed jobID is in the barrier make this jobID the failed job for this userID. Removes the job from the concurrent jobs map if it exists there
func (c *jobFailedCommand) execute(barriers map[string]*barrierInfo) {
barrier, ok := barriers[c.userID]
func (c *jobFailedCommand) execute(b *Barrier) {
barrier, ok := b.barriers[c.userID]
if !ok {
barrier = &barrierInfo{}
barriers[c.userID] = barrier
b.barriers[c.userID] = barrier
}
barrier.mu.Lock()
defer barrier.mu.Unlock()

// it is unfortunately possible within a single batch for events to be processed out-of-order
if barrier.failedJobID == nil {
barrier.failedJobID = &c.jobID
} else if *barrier.failedJobID > c.jobID {
panic(fmt.Errorf("previousFailedJob:%d > jobID:%d", *barrier.failedJobID, c.jobID))
panic(fmt.Errorf("detected illegal job sequence during barrier job failed %+v: userID %q, previousFailedJob:%d > jobID:%d", b.metadata, c.userID, *barrier.failedJobID, c.jobID))
}
// reset concurrency limiter
barrier.concurrencyLimiter = nil
}

// a failed command never gets enqueued
func (*jobFailedCommand) enqueue(_ map[string]*barrierInfo) bool {
func (*jobFailedCommand) enqueue(_ *Barrier) bool {
return false
}

Expand All @@ -225,13 +250,13 @@ type jobSucceededCmd struct {
}

// removes the barrier for this userID, if it exists
func (c *jobSucceededCmd) execute(barriers map[string]*barrierInfo) {
if barrier, ok := barriers[c.userID]; ok {
func (c *jobSucceededCmd) execute(b *Barrier) {
if barrier, ok := b.barriers[c.userID]; ok {
if barrier.failedJobID != nil && *barrier.failedJobID != c.jobID { // out-of-sync command (failed commands get executed immediately)
return
}
}
delete(barriers, c.userID)
delete(b.barriers, c.userID)
}

// jobAbortedCommand is a command that is executed when a job has aborted.
Expand All @@ -240,11 +265,11 @@ type jobAbortedCommand struct {
}

// Creates a concurrent jobs map if none exists. Also removes the jobID from the concurrent jobs map if it exists there
func (c *jobAbortedCommand) execute(barriers map[string]*barrierInfo) {
if barrier, ok := barriers[c.userID]; ok {
func (c *jobAbortedCommand) execute(b *Barrier) {
if barrier, ok := b.barriers[c.userID]; ok {
if barrier.failedJobID == nil {
// no previously failed job, simply remove the barrier
delete(barriers, c.userID)
delete(b.barriers, c.userID)
return
}
if *barrier.failedJobID != c.jobID {
Expand Down
2 changes: 1 addition & 1 deletion router/internal/eventorder/eventorder_simulation_test.go
Expand Up @@ -36,7 +36,7 @@ func TestSimulateBarrier(t *testing.T) {
defer cancel()

var logger log = t
barrier := eventorder.NewBarrier(2)
barrier := eventorder.NewBarrier(eventorder.WithConcurrencyLimit(2))
generator := &generatorLoop{ctx: ctx, barrier: barrier, batchSize: batchSize, pending: jobs, out: workerQueue, logger: logger}
worker := &workerProcess{ctx: ctx, barrier: barrier, in: workerQueue, out: statusQueue, logger: logger}
commit := &commitStatusLoop{ctx: ctx, barrier: barrier, in: statusQueue, putBack: generator.putBack, logger: logger}
Expand Down
12 changes: 6 additions & 6 deletions router/internal/eventorder/eventorder_test.go
Expand Up @@ -8,7 +8,7 @@ import (
)

func Test_Job_Failed_Scenario(t *testing.T) {
barrier := NewBarrier(0)
barrier := NewBarrier(WithMetadata(map[string]string{"key1": "value1"}))

enter, previousFailedJobID := barrier.Enter("user1", 1)
require.True(t, enter, "job 1 for user1 should be accepted since no barrier exists")
Expand All @@ -25,7 +25,7 @@ func Test_Job_Failed_Scenario(t *testing.T) {

require.True(t, firstBool(barrier.Wait("user1", 2)), "job 2 for user1 should wait after job 1 has failed")
require.Equal(t, 1, barrier.Size(), "barrier should have size of 1")
require.Equal(t, `Barrier{[{userID: user1, failedJobID: 1, concurrentJobs: map[]}]}`, barrier.String(), "the barrier's string representation should be human readable")
require.Equal(t, `Barrier{map[key1:value1][{userID: user1, failedJobID: 1, concurrentJobs: map[]}]}`, barrier.String(), "the barrier's string representation should be human readable")
require.NoError(t, barrier.StateChanged("user1", 2, jobsdb.Waiting.State))

barrier.Sync()
Expand All @@ -51,7 +51,7 @@ func Test_Job_Failed_Scenario(t *testing.T) {
}

func Test_Job_Aborted_Scenario(t *testing.T) {
barrier := NewBarrier(1)
barrier := NewBarrier(WithConcurrencyLimit(1))

// Fail job 1 then enter again
enter, previousFailedJobID := barrier.Enter("user1", 1)
Expand Down Expand Up @@ -118,7 +118,7 @@ func Test_Job_Aborted_Scenario(t *testing.T) {
}

func Test_Job_Abort_then_Fail(t *testing.T) {
barrier := NewBarrier(2)
barrier := NewBarrier(WithConcurrencyLimit(2))

enter, previousFailedJobID := barrier.Enter("user1", 1)
require.True(t, enter, "job 1 for user1 should be accepted since no barrier exists")
Expand Down Expand Up @@ -156,7 +156,7 @@ func Test_Job_Abort_then_Fail(t *testing.T) {
}

func Test_Job_Fail_then_Abort(t *testing.T) {
barrier := NewBarrier(2)
barrier := NewBarrier(WithConcurrencyLimit(2))

enter, previousFailedJobID := barrier.Enter("user1", 1)
require.True(t, enter, "job 1 for user1 should be accepted since no barrier exists")
Expand Down Expand Up @@ -206,7 +206,7 @@ func Test_Job_Fail_then_Abort(t *testing.T) {
}

func Test_Panic_Scenarios(t *testing.T) {
barrier := NewBarrier(0)
barrier := NewBarrier()

enter, _ := barrier.Enter("user1", 2)
require.True(t, enter, "job 2 for user1 should be accepted since no barrier exists")
Expand Down
10 changes: 8 additions & 2 deletions router/router.go
Expand Up @@ -1162,8 +1162,14 @@ func (rt *HandleT) initWorkers() {
g, _ := errgroup.WithContext(context.Background())
for i := 0; i < rt.noOfWorkers; i++ {
worker := &workerT{
channel: make(chan workerMessageT, noOfJobsPerChannel),
barrier: eventorder.NewBarrier(rt.allowAbortedUserJobsCountForProcessing),
channel: make(chan workerMessageT, noOfJobsPerChannel),
barrier: eventorder.NewBarrier(
eventorder.WithConcurrencyLimit(rt.allowAbortedUserJobsCountForProcessing),
eventorder.WithMetadata(map[string]string{
"destType": rt.destName,
"batching": strconv.FormatBool(rt.enableBatching),
"transformerProxy": strconv.FormatBool(rt.transformerProxy),
})),
retryForJobMap: make(map[int64]time.Time),
workerID: i,
failedJobs: 0,
Expand Down

0 comments on commit 12d3e59

Please sign in to comment.