Skip to content

Commit

Permalink
feat: toggle event ordering for workspace/destination
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 7, 2024
1 parent db9b6d9 commit 9e0031c
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 124 deletions.
3 changes: 2 additions & 1 deletion router/eventorder_debugger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func TestEventOrderDebugInfo(t *testing.T) {
_, err = pgContainer.DB.Exec("UPDATE rt_jobs_1 SET created_at = $1", refTime)
require.NoError(t, err)

debugInfo := rt.eventOrderDebugInfo("user1:destination1")
debugInfo := rt.eventOrderDebugInfo(eventorder.BarrierKey{UserID: "user1", DestinationID: "destination1"})
require.Equal(t,
` | t_name| job_id| created_at| status_id| job_state| attempt| exec_time| error_code| parameters| error_response|
| ---| ---| ---| ---| ---| ---| ---| ---| ---| ---|
Expand Down
28 changes: 23 additions & 5 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Handle struct {
startEnded chan struct{}
barrier *eventorder.Barrier

eventOrderingDisabledForWorkspace func(workspaceID string) bool
eventOrderingDisabledForDestination func(destinationID string) bool

limiter struct {
pickup kitsync.Limiter
transform kitsync.Limiter
Expand Down Expand Up @@ -193,7 +196,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke

var statusList []*jobsdb.JobStatusT
var reservedJobs []reservedJob
blockedOrderKeys := make(map[string]struct{})
blockedOrderKeys := make(map[eventorder.BarrierKey]struct{})

flushTime := time.Now()
shouldFlush := func() bool {
Expand Down Expand Up @@ -482,7 +485,15 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if status != jobsdb.Failed.State {
orderKey := jobOrderKey(userID, gjson.GetBytes(resp.job.Parameters, "destination_id").String())
rt.logger.Debugf("EventOrder: [%d] job %d for key %s %s", worker.id, resp.status.JobID, orderKey, status)
if err := worker.barrier.StateChanged(orderKey, resp.status.JobID, status); err != nil {
if err := worker.barrier.StateChanged(
eventorder.BarrierKey{
UserID: userID,
DestinationID: gjson.GetBytes(resp.job.Parameters, "destination_id").String(),
WorkspaceID: resp.job.WorkspaceId,
},
resp.status.JobID,
status,
); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -522,7 +533,7 @@ type workerJobSlot struct {
drainReason string
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerJobSlot, error) {
func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[eventorder.BarrierKey]struct{}) (*workerJobSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil, types.ErrContextCancelled
}
Expand All @@ -533,14 +544,21 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrParamsUnmarshal
}

orderKey := jobOrderKey(job.UserID, parameters.DestinationID)
orderKey := eventorder.BarrierKey{
UserID: job.UserID,
DestinationID: parameters.DestinationID,
WorkspaceID: job.WorkspaceId,
}

eventOrderingDisabled := !rt.guaranteeUserEventOrder
if !eventOrderingDisabled && rt.barrier.Disabled(orderKey) {
if (rt.guaranteeUserEventOrder && rt.barrier.Disabled(orderKey)) ||
(rt.eventOrderingDisabledForWorkspace(job.WorkspaceId) ||
rt.eventOrderingDisabledForDestination(parameters.DestinationID)) {
eventOrderingDisabled = true
stats.Default.NewTaggedStat("router_eventorder_key_disabled", stats.CountType, stats.Tags{
"destType": rt.destType,
"destinationId": parameters.DestinationID,
"workspaceID": job.WorkspaceId,
}).Increment()
}
abortedJob, abortReason := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to its worker right away
Expand Down
10 changes: 10 additions & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"context"
"fmt"
"slices"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -136,6 +137,12 @@ func (rt *Handle) Setup(
panic(fmt.Errorf("resolving isolation strategy for mode %q: %w", isolationMode, err))
}

rt.eventOrderingDisabledForWorkspace = func(workspaceID string) bool {
return slices.Contains(config.GetStringSlice("Router.orderingDisabledWorkspaceIDs", nil), workspaceID)
}
rt.eventOrderingDisabledForDestination = func(destinationID string) bool {
return slices.Contains(config.GetStringSlice("Router.orderingDisabledDestinationIDs", nil), destinationID)
}
rt.barrier = eventorder.NewBarrier(eventorder.WithMetadata(map[string]string{
"destType": rt.destType,
"batching": strconv.FormatBool(rt.enableBatching),
Expand All @@ -146,6 +153,9 @@ func (rt *Handle) Setup(
eventorder.WithHalfEnabledStateDuration(rt.eventOrderHalfEnabledStateDuration),
eventorder.WithDrainConcurrencyLimit(rt.drainConcurrencyLimit),
eventorder.WithDebugInfoProvider(rt.eventOrderDebugInfo),
eventorder.WithOrderingDisabledCheckForBarrierKey(func(key eventorder.BarrierKey) bool {
return rt.eventOrderingDisabledForWorkspace(key.WorkspaceID) || rt.eventOrderingDisabledForDestination(key.DestinationID)
}),
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions router/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/sqlutil"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -137,13 +138,13 @@ func (rt *Handle) pipelineDelayStats(partition string, first, last *jobsdb.JobT)

// eventOrderDebugInfo provides some debug information for the given orderKey in case of a panic.
// Top 100 job statuses for the given orderKey are returned.
func (rt *Handle) eventOrderDebugInfo(orderKey string) (res string) {
func (rt *Handle) eventOrderDebugInfo(orderKey eventorder.BarrierKey) (res string) {
defer func() {
if r := recover(); r != nil {
res = fmt.Sprintf("panic in EventOrderDebugInfo: %v", r)
}
}()
userID, destinationID := parseJobOrderKey(orderKey)
userID, destinationID := orderKey.UserID, orderKey.DestinationID
if err := rt.jobsDB.WithTx(func(tx *Tx) error {
rows, err := tx.Query(`SELECT * FROM joborderlog($1, $2, 10) LIMIT 100`, destinationID, userID)
if err != nil {
Expand Down
44 changes: 33 additions & 11 deletions router/internal/eventorder/eventorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,22 @@ func WithDrainConcurrencyLimit(drainLimit misc.ValueLoader[int]) OptFn {
}

// WithDebugInfoProvider sets the debug info provider for the barrier (used for debugging purposes in case an illegal job sequence is detected)
func WithDebugInfoProvider(debugInfoProvider func(key string) string) OptFn {
func WithDebugInfoProvider(debugInfoProvider func(key BarrierKey) string) OptFn {
return func(b *Barrier) {
b.debugInfo = debugInfoProvider
}
}

func WithOrderingDisabledCheckForBarrierKey(orderingDisabledForKey func(key BarrierKey) bool) OptFn {
return func(b *Barrier) {
b.orderingDisabledForKey = orderingDisabledForKey
}
}

// NewBarrier creates a new properly initialized Barrier
func NewBarrier(fns ...OptFn) *Barrier {
b := &Barrier{
barriers: make(map[string]*barrierInfo),
barriers: make(map[BarrierKey]*barrierInfo),
metadata: make(map[string]string),
eventOrderKeyThreshold: misc.SingleValueLoader(0),
disabledStateDuration: misc.SingleValueLoader(10 * time.Minute),
Expand Down Expand Up @@ -88,7 +94,7 @@ func NewBarrier(fns ...OptFn) *Barrier {
type Barrier struct {
mu sync.RWMutex // mutex to synchronize concurrent access to the barrier's methods
queue []command
barriers map[string]*barrierInfo
barriers map[BarrierKey]*barrierInfo
metadata map[string]string

eventOrderKeyThreshold misc.ValueLoader[int] // maximum number of concurrent jobs for a given key (0 means no threshold)
Expand All @@ -97,13 +103,23 @@ type Barrier struct {

drainLimit misc.ValueLoader[int] // maximum number of concurrent jobs to accept after a previously failed job has been aborted

debugInfo func(key string) string
debugInfo func(key BarrierKey) string

orderingDisabledForKey func(key BarrierKey) bool
}

type BarrierKey struct {
DestinationID, UserID, WorkspaceID string
}

func (bk *BarrierKey) String() string {
return fmt.Sprintf("%s:%s:%s", bk.WorkspaceID, bk.DestinationID, bk.UserID)
}

// Enter the barrier for this key and jobID. If there is not already a barrier for this key
// returns true, otherwise false along with the previous failed jobID if this is the cause of the barrier.
// Another scenario where a barrier might exist for a key is when the previous job has failed in an unrecoverable manner and the drain limiter is enabled.
func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJobID *int64) {
func (b *Barrier) Enter(key BarrierKey, jobID int64) (accepted bool, previousFailedJobID *int64) {
b.mu.Lock()
defer b.mu.Unlock()
barrier, ok := b.barriers[key]
Expand All @@ -125,6 +141,12 @@ func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJ
return true, nil
}

if b.orderingDisabledForKey != nil && b.orderingDisabledForKey(key) {
barrier.state = stateDisabled
barrier.stateTime = time.Now()
return true, nil
}

// if key threshold is reached, disable the barrier and accept the job
if barrier.ConcurrencyLimitReached(jobID, b.eventOrderKeyThreshold.Load()) {
b.barriers[key] = &barrierInfo{
Expand Down Expand Up @@ -171,7 +193,7 @@ func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJ
// Leave the barrier for this key and jobID. Leave acts as an undo operation for Enter, i.e.
// when a previously-entered job leaves the barrier it is as if this key and jobID didn't enter the barrier.
// Calling Leave is idempotent.
func (b *Barrier) Leave(key string, jobID int64) {
func (b *Barrier) Leave(key BarrierKey, jobID int64) {
b.mu.Lock()
defer b.mu.Unlock()
// remove the job from the active limiters
Expand All @@ -184,7 +206,7 @@ func (b *Barrier) Leave(key string, jobID int64) {
}

// Peek returns the previously failed jobID for the given key, if any
func (b *Barrier) Peek(key string) (previousFailedJobID *int64) {
func (b *Barrier) Peek(key BarrierKey) (previousFailedJobID *int64) {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand All @@ -196,7 +218,7 @@ func (b *Barrier) Peek(key string) (previousFailedJobID *int64) {
}

// Wait returns true if the job for this key shouldn't continue, but wait (transition to a waiting state)
func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID *int64) {
func (b *Barrier) Wait(key BarrierKey, jobID int64) (wait bool, previousFailedJobID *int64) {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand Down Expand Up @@ -227,7 +249,7 @@ func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID
// StateChanged must be called at the end, after the job state change has been persisted.
// The only exception to this rule is when a job has failed in a retryable manner, in this scenario you should notify the barrier immediately after the failure.
// An [ErrUnsupportedState] error will be returned if the state is not supported.
func (b *Barrier) StateChanged(key string, jobID int64, state string) error {
func (b *Barrier) StateChanged(key BarrierKey, jobID int64, state string) error {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down Expand Up @@ -269,7 +291,7 @@ func (b *Barrier) Sync() int {
}

// Disabled returns [true] if the barrier is disabled for this key, [false] otherwise
func (b *Barrier) Disabled(key string) bool {
func (b *Barrier) Disabled(key BarrierKey) bool {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand Down Expand Up @@ -386,7 +408,7 @@ type command interface {
}

type cmd struct {
key string
key BarrierKey
jobID int64
}

Expand Down
12 changes: 6 additions & 6 deletions router/internal/eventorder/eventorder_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ func (g *generatorLoop) run() {
g.runtime.minJobID = job.id
}
// randomly drain 0.1% of non-previously failed jobs (previously failed jobs cannot be drained at this stage)
if previousFailedJobID := g.barrier.Peek(job.user); previousFailedJobID != nil && *previousFailedJobID != job.id && rand.Intn(1000) < 1 { // skipcq: GSC-G404
if err := g.barrier.StateChanged(job.user, job.id, jobsdb.Aborted.State); err != nil {
if previousFailedJobID := g.barrier.Peek(eventorder.BarrierKey{UserID: job.user}); previousFailedJobID != nil && *previousFailedJobID != job.id && rand.Intn(1000) < 1 { // skipcq: GSC-G404
if err := g.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, jobsdb.Aborted.State); err != nil {
panic(fmt.Errorf("could not drain job:%d: %w", job.id, err))
}
g.drained = append(g.drained, job)
g.logger.Logf("drained job:%d", job.id)
continue
}

if accept, blockJobID := g.barrier.Enter(job.user, job.id); accept {
if accept, blockJobID := g.barrier.Enter(eventorder.BarrierKey{UserID: job.user}, job.id); accept {
if blockJobID != nil && *blockJobID > job.id {
panic(fmt.Errorf("job.JobID:%d < blockJobID:%d", job.id, *blockJobID))
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (wp *workerProcess) processJobs() {
for _, job := range wp.jobs {
// introduce some random delay during processing so that buffers don't empty at a steady pace
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) // skipcq: GSC-G404
wait, previousFailedJobID := wp.barrier.Wait(job.user, job.id)
wait, previousFailedJobID := wp.barrier.Wait(eventorder.BarrierKey{UserID: job.user}, job.id)

if wait {
job.states = append([]string{jobsdb.Waiting.State}, job.states...)
Expand All @@ -215,7 +215,7 @@ func (wp *workerProcess) processJobs() {
continue
}
if job.states[0] == jobsdb.Failed.State {
_ = wp.barrier.StateChanged(job.user, job.id, jobsdb.Failed.State)
_ = wp.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, jobsdb.Failed.State)
}
wp.out <- job
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (cl *commitStatusLoop) commit() {
time.Sleep(time.Duration(rand.Intn(2)) * time.Millisecond) // skipcq: GSC-G404
switch job.states[0] {
case "aborted", "succeeded", "waiting":
_ = cl.barrier.StateChanged(job.user, job.id, job.states[0])
_ = cl.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, job.states[0])
}
if len(job.states) == 1 {
if job.states[0] != "succeeded" && job.states[0] != "aborted" {
Expand Down

0 comments on commit 9e0031c

Please sign in to comment.