Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 9, 2024
1 parent 00b53f2 commit 1170a58
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
60 changes: 53 additions & 7 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ func TestBackoff(t *testing.T) {
maxFailedCountForJob: misc.SingleValueLoader(3),
retryTimeWindow: misc.SingleValueLoader(180 * time.Minute),
},
drainer: &drainer{},
throttlerFactory: &mockThrottlerFactory{count: new(atomic.Int64)},
eventOrderingDisabledForWorkspace: func(workspaceID string) bool { return false },
drainer: &drainer{},
throttlerFactory: &mockThrottlerFactory{count: new(atomic.Int64)},
eventOrderingDisabledForWorkspace: func(workspaceID string) bool {
return slices.Contains(conf.GetStringSlice("Router.orderingDisabledWorkspaceIDs", nil), workspaceID)
},
eventOrderingDisabledForDestination: func(destinationID string) bool {
return slices.Contains(conf.GetStringSlice("Router.orderingDisabledDestinationIDs", nil), "destination")
return slices.Contains(conf.GetStringSlice("Router.orderingDisabledDestinationIDs", nil), destinationID)
},
}
workers := []*worker{{
Expand Down Expand Up @@ -412,7 +414,7 @@ func TestBackoff(t *testing.T) {
require.ErrorIs(t, err, types.ErrJobOrderBlocked)
})

t.Run("job not blocked after event ordering is disabled", func(t *testing.T) {
t.Run("job not blocked after event ordering is disabled(destinationID level)", func(t *testing.T) {
r.guaranteeUserEventOrder = true
workers[0].inputReservations = 0
job := &jobsdb.JobT{
Expand All @@ -423,14 +425,58 @@ func TestBackoff(t *testing.T) {
AttemptNum: 1,
RetryTime: time.Now().Add(-1 * time.Hour),
},
WorkspaceId: "someWorkspace",
}
conf.Set("Router.orderingDisabledDestinationIDs", []string{"destination"})
slot, err := r.findWorkerSlot(context.Background(), workers, job, map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination"}: {}})
slot, err := r.findWorkerSlot(
context.Background(),
workers,
job,
map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination", WorkspaceID: job.WorkspaceId}: {}},
)
require.NoError(t, err)
require.NotNil(t, slot)

conf.Set("Router.orderingDisabledDestinationIDs", nil)
slot, err = r.findWorkerSlot(context.Background(), workers, job, map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination"}: {}})
slot, err = r.findWorkerSlot(
context.Background(),
workers,
job,
map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination", WorkspaceID: job.WorkspaceId}: {}})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobOrderBlocked)
})

t.Run("job not blocked after event ordering is disabled(workspaceID level)", func(t *testing.T) {
r.guaranteeUserEventOrder = true
workers[0].inputReservations = 0
job := &jobsdb.JobT{
JobID: 1,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 1,
RetryTime: time.Now().Add(-1 * time.Hour),
},
WorkspaceId: "someWorkspace",
}
conf.Set("Router.orderingDisabledWorkspaceIDs", []string{"someWorkspace"})
slot, err := r.findWorkerSlot(
context.Background(),
workers,
job,
map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination", WorkspaceID: job.WorkspaceId}: {}},
)
require.NoError(t, err)
require.NotNil(t, slot)

conf.Set("Router.orderingDisabledWorkspaceIDs", nil)
slot, err = r.findWorkerSlot(
context.Background(),
workers,
job,
map[eventorder.BarrierKey]struct{}{{UserID: job.UserID, DestinationID: "destination", WorkspaceID: job.WorkspaceId}: {}},
)
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobOrderBlocked)
})
Expand Down
12 changes: 8 additions & 4 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,11 @@ func (w *worker) prepareResponsesForJobs(destinationJob *types.DestinationJobT,
}

func (w *worker) canSendJobToDestination(failedJobOrderKeys map[eventorder.BarrierKey]struct{}, destinationJob *types.DestinationJobT) bool {
if !w.rt.guaranteeUserEventOrder {
destinationID := destinationJob.JobMetadataArray[0].DestinationID
workspaceID := destinationJob.JobMetadataArray[0].WorkspaceID
if !w.rt.guaranteeUserEventOrder ||
w.rt.eventOrderingDisabledForWorkspace(workspaceID) ||
w.rt.eventOrderingDisabledForDestination(destinationID) {
// if guaranteeUserEventOrder is false, letting the next jobs pass
return true
}
Expand All @@ -894,10 +898,10 @@ func (w *worker) canSendJobToDestination(failedJobOrderKeys map[eventorder.Barri
for i := range destinationJob.JobMetadataArray {
orderKey := eventorder.BarrierKey{
UserID: destinationJob.JobMetadataArray[i].UserID,
DestinationID: destinationJob.JobMetadataArray[i].DestinationID,
WorkspaceID: destinationJob.JobMetadataArray[i].WorkspaceID,
DestinationID: destinationID,
WorkspaceID: workspaceID,
}
if _, ok := failedJobOrderKeys[orderKey]; ok {
if _, ok := failedJobOrderKeys[orderKey]; ok && !w.barrier.Disabled(orderKey) {
return false
}
}
Expand Down

0 comments on commit 1170a58

Please sign in to comment.