Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: toggle event ordering for workspace/destination #4278

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion router/eventorder_debugger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func TestEventOrderDebugInfo(t *testing.T) {
_, err = pgContainer.DB.Exec("UPDATE rt_jobs_1 SET created_at = $1", refTime)
require.NoError(t, err)

debugInfo := rt.eventOrderDebugInfo("user1:destination1")
debugInfo := rt.eventOrderDebugInfo(eventorder.BarrierKey{UserID: "user1", DestinationID: "destination1"})
require.Equal(t,
` | t_name| job_id| created_at| status_id| job_state| attempt| exec_time| error_code| parameters| error_response|
| ---| ---| ---| ---| ---| ---| ---| ---| ---| ---|
Expand Down
32 changes: 26 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Handle struct {
startEnded chan struct{}
barrier *eventorder.Barrier

eventOrderingDisabledForWorkspace func(workspaceID string) bool
eventOrderingDisabledForDestination func(destinationID string) bool

limiter struct {
pickup kitsync.Limiter
transform kitsync.Limiter
Expand Down Expand Up @@ -193,7 +196,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke

var statusList []*jobsdb.JobStatusT
var reservedJobs []reservedJob
blockedOrderKeys := make(map[string]struct{})
blockedOrderKeys := make(map[eventorder.BarrierKey]struct{})

flushTime := time.Now()
shouldFlush := func() bool {
Expand Down Expand Up @@ -480,8 +483,18 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
userID := resp.userID
worker := resp.worker
if status != jobsdb.Failed.State {
orderKey := jobOrderKey(userID, gjson.GetBytes(resp.job.Parameters, "destination_id").String())
rt.logger.Debugf("EventOrder: [%d] job %d for key %s %s", worker.id, resp.status.JobID, orderKey, status)
orderKey := eventorder.BarrierKey{
UserID: userID,
DestinationID: gjson.GetBytes(resp.job.Parameters, "destination_id").String(),
WorkspaceID: resp.job.WorkspaceId,
}
rt.logger.Debugw(
"EventOrder",
"worker#", worker.id,
"jobID", resp.status.JobID,
"key", orderKey.String(),
"jobState", status,
)
if err := worker.barrier.StateChanged(orderKey, resp.status.JobID, status); err != nil {
panic(err)
}
Expand Down Expand Up @@ -522,7 +535,7 @@ type workerJobSlot struct {
drainReason string
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerJobSlot, error) {
func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[eventorder.BarrierKey]struct{}) (*workerJobSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil, types.ErrContextCancelled
}
Expand All @@ -533,14 +546,21 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrParamsUnmarshal
}

orderKey := jobOrderKey(job.UserID, parameters.DestinationID)
orderKey := eventorder.BarrierKey{
UserID: job.UserID,
DestinationID: parameters.DestinationID,
WorkspaceID: job.WorkspaceId,
}

eventOrderingDisabled := !rt.guaranteeUserEventOrder
if !eventOrderingDisabled && rt.barrier.Disabled(orderKey) {
if (rt.guaranteeUserEventOrder && rt.barrier.Disabled(orderKey)) ||
(rt.eventOrderingDisabledForWorkspace(job.WorkspaceId) ||
rt.eventOrderingDisabledForDestination(parameters.DestinationID)) {
eventOrderingDisabled = true
stats.Default.NewTaggedStat("router_eventorder_key_disabled", stats.CountType, stats.Tags{
"destType": rt.destType,
"destinationId": parameters.DestinationID,
"workspaceID": job.WorkspaceId,
}).Increment()
}
abortedJob, abortReason := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to its worker right away
Expand Down
10 changes: 10 additions & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"context"
"fmt"
"slices"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -136,6 +137,12 @@ func (rt *Handle) Setup(
panic(fmt.Errorf("resolving isolation strategy for mode %q: %w", isolationMode, err))
}

rt.eventOrderingDisabledForWorkspace = func(workspaceID string) bool {
return slices.Contains(config.GetStringSlice("Router.orderingDisabledWorkspaceIDs", nil), workspaceID)
}
rt.eventOrderingDisabledForDestination = func(destinationID string) bool {
return slices.Contains(config.GetStringSlice("Router.orderingDisabledDestinationIDs", nil), destinationID)
}
rt.barrier = eventorder.NewBarrier(eventorder.WithMetadata(map[string]string{
"destType": rt.destType,
"batching": strconv.FormatBool(rt.enableBatching),
Expand All @@ -146,6 +153,9 @@ func (rt *Handle) Setup(
eventorder.WithHalfEnabledStateDuration(rt.eventOrderHalfEnabledStateDuration),
eventorder.WithDrainConcurrencyLimit(rt.drainConcurrencyLimit),
eventorder.WithDebugInfoProvider(rt.eventOrderDebugInfo),
eventorder.WithOrderingDisabledCheckForBarrierKey(func(key eventorder.BarrierKey) bool {
return rt.eventOrderingDisabledForWorkspace(key.WorkspaceID) || rt.eventOrderingDisabledForDestination(key.DestinationID)
}),
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions router/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/sqlutil"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -137,13 +138,13 @@ func (rt *Handle) pipelineDelayStats(partition string, first, last *jobsdb.JobT)

// eventOrderDebugInfo provides some debug information for the given orderKey in case of a panic.
// Top 100 job statuses for the given orderKey are returned.
func (rt *Handle) eventOrderDebugInfo(orderKey string) (res string) {
func (rt *Handle) eventOrderDebugInfo(orderKey eventorder.BarrierKey) (res string) {
defer func() {
if r := recover(); r != nil {
res = fmt.Sprintf("panic in EventOrderDebugInfo: %v", r)
}
}()
userID, destinationID := parseJobOrderKey(orderKey)
userID, destinationID := orderKey.UserID, orderKey.DestinationID
if err := rt.jobsDB.WithTx(func(tx *Tx) error {
rows, err := tx.Query(`SELECT * FROM joborderlog($1, $2, 10) LIMIT 100`, destinationID, userID)
if err != nil {
Expand Down
52 changes: 36 additions & 16 deletions router/internal/eventorder/eventorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,22 @@ func WithDrainConcurrencyLimit(drainLimit misc.ValueLoader[int]) OptFn {
}

// WithDebugInfoProvider sets the debug info provider for the barrier (used for debugging purposes in case an illegal job sequence is detected)
func WithDebugInfoProvider(debugInfoProvider func(key string) string) OptFn {
func WithDebugInfoProvider(debugInfoProvider func(key BarrierKey) string) OptFn {
return func(b *Barrier) {
b.debugInfo = debugInfoProvider
}
}

func WithOrderingDisabledCheckForBarrierKey(orderingDisabledForKey func(key BarrierKey) bool) OptFn {
return func(b *Barrier) {
b.orderingDisabledForKey = orderingDisabledForKey
}
}

// NewBarrier creates a new properly initialized Barrier
func NewBarrier(fns ...OptFn) *Barrier {
b := &Barrier{
barriers: make(map[string]*barrierInfo),
barriers: make(map[BarrierKey]*barrierInfo),
metadata: make(map[string]string),
eventOrderKeyThreshold: misc.SingleValueLoader(0),
disabledStateDuration: misc.SingleValueLoader(10 * time.Minute),
Expand Down Expand Up @@ -88,7 +94,7 @@ func NewBarrier(fns ...OptFn) *Barrier {
type Barrier struct {
mu sync.RWMutex // mutex to synchronize concurrent access to the barrier's methods
queue []command
barriers map[string]*barrierInfo
barriers map[BarrierKey]*barrierInfo
metadata map[string]string

eventOrderKeyThreshold misc.ValueLoader[int] // maximum number of concurrent jobs for a given key (0 means no threshold)
Expand All @@ -97,13 +103,23 @@ type Barrier struct {

drainLimit misc.ValueLoader[int] // maximum number of concurrent jobs to accept after a previously failed job has been aborted

debugInfo func(key string) string
debugInfo func(key BarrierKey) string

orderingDisabledForKey func(key BarrierKey) bool
}

type BarrierKey struct {
DestinationID, UserID, WorkspaceID string
}

func (bk *BarrierKey) String() string {
return fmt.Sprintf("%s:%s:%s", bk.WorkspaceID, bk.DestinationID, bk.UserID)
}

// Enter the barrier for this key and jobID. If there is not already a barrier for this key
// returns true, otherwise false along with the previous failed jobID if this is the cause of the barrier.
// Another scenario where a barrier might exist for a key is when the previous job has failed in an unrecoverable manner and the drain limiter is enabled.
func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJobID *int64) {
func (b *Barrier) Enter(key BarrierKey, jobID int64) (accepted bool, previousFailedJobID *int64) {
b.mu.Lock()
defer b.mu.Unlock()
barrier, ok := b.barriers[key]
Expand All @@ -118,15 +134,15 @@ func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJ
b.barriers[key] = barrier
}

b.updateState(barrier)
b.updateState(barrier, key)

// if the barrier is in a disabled state, accept the job
if barrier.state == stateDisabled {
return true, nil
}

// if key threshold is reached, disable the barrier and accept the job
if barrier.ConcurrencyLimitReached(jobID, b.eventOrderKeyThreshold.Load()) {
if barrier.concurrencyLimitReached(jobID, b.eventOrderKeyThreshold.Load()) {
b.barriers[key] = &barrierInfo{
state: stateDisabled,
stateTime: time.Now(),
Expand Down Expand Up @@ -171,7 +187,7 @@ func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJ
// Leave the barrier for this key and jobID. Leave acts as an undo operation for Enter, i.e.
// when a previously-entered job leaves the barrier it is as if this key and jobID didn't enter the barrier.
// Calling Leave is idempotent.
func (b *Barrier) Leave(key string, jobID int64) {
func (b *Barrier) Leave(key BarrierKey, jobID int64) {
b.mu.Lock()
defer b.mu.Unlock()
// remove the job from the active limiters
Expand All @@ -184,7 +200,7 @@ func (b *Barrier) Leave(key string, jobID int64) {
}

// Peek returns the previously failed jobID for the given key, if any
func (b *Barrier) Peek(key string) (previousFailedJobID *int64) {
func (b *Barrier) Peek(key BarrierKey) (previousFailedJobID *int64) {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand All @@ -196,7 +212,7 @@ func (b *Barrier) Peek(key string) (previousFailedJobID *int64) {
}

// Wait returns true if the job for this key shouldn't continue, but wait (transition to a waiting state)
func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID *int64) {
func (b *Barrier) Wait(key BarrierKey, jobID int64) (wait bool, previousFailedJobID *int64) {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand Down Expand Up @@ -227,7 +243,7 @@ func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID
// StateChanged must be called at the end, after the job state change has been persisted.
// The only exception to this rule is when a job has failed in a retryable manner, in this scenario you should notify the barrier immediately after the failure.
// An [ErrUnsupportedState] error will be returned if the state is not supported.
func (b *Barrier) StateChanged(key string, jobID int64, state string) error {
func (b *Barrier) StateChanged(key BarrierKey, jobID int64, state string) error {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down Expand Up @@ -269,7 +285,7 @@ func (b *Barrier) Sync() int {
}

// Disabled returns [true] if the barrier is disabled for this key, [false] otherwise
func (b *Barrier) Disabled(key string) bool {
func (b *Barrier) Disabled(key BarrierKey) bool {
b.mu.RLock()
defer b.mu.RUnlock()
barrier, ok := b.barriers[key]
Expand Down Expand Up @@ -304,7 +320,11 @@ func (b *Barrier) String() string {
//
// 1. Disabled: transitions to half-enabled after disabledStateDuration
// 2. Half-enabled: transitions to enabled after halfEnabledStateDuration
func (b *Barrier) updateState(barrier *barrierInfo) {
func (b *Barrier) updateState(barrier *barrierInfo, key BarrierKey) {
if b.orderingDisabledForKey != nil && b.orderingDisabledForKey(key) {
barrier.state = stateDisabled
barrier.stateTime = time.Now()
}
switch barrier.state {
case stateDisabled:
if time.Since(barrier.stateTime) > b.disabledStateDuration.Load() {
Expand Down Expand Up @@ -352,8 +372,8 @@ func (bi *barrierInfo) Leave(jobID int64) {
delete(bi.drainLimiter, jobID)
}

// ConcurrencyLimitReached returns true if the barrier's concurrency limit has been reached
func (bi *barrierInfo) ConcurrencyLimitReached(jobID int64, limit int) bool {
// concurrencyLimitReached returns true if the barrier's concurrency limit has been reached
func (bi *barrierInfo) concurrencyLimitReached(jobID int64, limit int) bool {
if bi.concurrencyLimiter != nil {
if _, ok := bi.concurrencyLimiter[jobID]; !ok {
return len(bi.concurrencyLimiter) >= limit
Expand Down Expand Up @@ -386,7 +406,7 @@ type command interface {
}

type cmd struct {
key string
key BarrierKey
jobID int64
}

Expand Down
12 changes: 6 additions & 6 deletions router/internal/eventorder/eventorder_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ func (g *generatorLoop) run() {
g.runtime.minJobID = job.id
}
// randomly drain 0.1% of non-previously failed jobs (previously failed jobs cannot be drained at this stage)
if previousFailedJobID := g.barrier.Peek(job.user); previousFailedJobID != nil && *previousFailedJobID != job.id && rand.Intn(1000) < 1 { // skipcq: GSC-G404
if err := g.barrier.StateChanged(job.user, job.id, jobsdb.Aborted.State); err != nil {
if previousFailedJobID := g.barrier.Peek(eventorder.BarrierKey{UserID: job.user}); previousFailedJobID != nil && *previousFailedJobID != job.id && rand.Intn(1000) < 1 { // skipcq: GSC-G404
if err := g.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, jobsdb.Aborted.State); err != nil {
panic(fmt.Errorf("could not drain job:%d: %w", job.id, err))
}
g.drained = append(g.drained, job)
g.logger.Logf("drained job:%d", job.id)
continue
}

if accept, blockJobID := g.barrier.Enter(job.user, job.id); accept {
if accept, blockJobID := g.barrier.Enter(eventorder.BarrierKey{UserID: job.user}, job.id); accept {
if blockJobID != nil && *blockJobID > job.id {
panic(fmt.Errorf("job.JobID:%d < blockJobID:%d", job.id, *blockJobID))
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (wp *workerProcess) processJobs() {
for _, job := range wp.jobs {
// introduce some random delay during processing so that buffers don't empty at a steady pace
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) // skipcq: GSC-G404
wait, previousFailedJobID := wp.barrier.Wait(job.user, job.id)
wait, previousFailedJobID := wp.barrier.Wait(eventorder.BarrierKey{UserID: job.user}, job.id)

if wait {
job.states = append([]string{jobsdb.Waiting.State}, job.states...)
Expand All @@ -215,7 +215,7 @@ func (wp *workerProcess) processJobs() {
continue
}
if job.states[0] == jobsdb.Failed.State {
_ = wp.barrier.StateChanged(job.user, job.id, jobsdb.Failed.State)
_ = wp.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, jobsdb.Failed.State)
}
wp.out <- job
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (cl *commitStatusLoop) commit() {
time.Sleep(time.Duration(rand.Intn(2)) * time.Millisecond) // skipcq: GSC-G404
switch job.states[0] {
case "aborted", "succeeded", "waiting":
_ = cl.barrier.StateChanged(job.user, job.id, job.states[0])
_ = cl.barrier.StateChanged(eventorder.BarrierKey{UserID: job.user}, job.id, job.states[0])
}
if len(job.states) == 1 {
if job.states[0] != "succeeded" && job.states[0] != "aborted" {
Expand Down