Skip to content

Commit

Permalink
fix: detected illegal job sequence during barrier enter/wait (#3881)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 19, 2023
1 parent 3bfe354 commit 7891da3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
9 changes: 9 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,7 @@ func (sm *storeMessage) merge(subJob *storeMessage) {
for id, job := range subJob.procErrorJobsByDestID {
sm.procErrorJobsByDestID[id] = append(sm.procErrorJobsByDestID[id], job...)
}
sm.routerDestIDs = append(sm.routerDestIDs, subJob.routerDestIDs...)

sm.reportMetrics = append(sm.reportMetrics, subJob.reportMetrics...)
for dupStatKey, count := range subJob.sourceDupStats {
Expand Down Expand Up @@ -1997,6 +1998,14 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
proc.storePlocker.Lock(destID)
defer proc.storePlocker.Unlock(destID)
}
} else {
proc.logger.Warnw("empty storeMessage.routerDestIDs",
"expected",
lo.Uniq(
lo.Map(destJobs, func(j *jobsdb.JobT, _ int) string {
return gjson.GetBytes(j.Parameters, "destination_id").String()
}),
))
}
err := misc.RetryWithNotify(
context.Background(),
Expand Down
66 changes: 66 additions & 0 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -3856,3 +3859,66 @@ var _ = Describe("TestConfigFilter", func() {
})
})
})

func TestStoreMessageMerge(t *testing.T) {
sm1 := &storeMessage{
statusList: []*jobsdb.JobStatusT{{JobID: 1}},
destJobs: []*jobsdb.JobT{{JobID: 1}},
batchDestJobs: []*jobsdb.JobT{{JobID: 1}},
procErrorJobsByDestID: map[string][]*jobsdb.JobT{
"1": {{JobID: 1}},
},
procErrorJobs: []*jobsdb.JobT{{JobID: 1}},
routerDestIDs: []string{"1"},
reportMetrics: []*types.PUReportedMetric{{}},
sourceDupStats: map[dupStatKey]int{{sourceID: "1"}: 1},
dedupKeys: map[string]struct{}{"1": {}},
totalEvents: 1,
}

sm2 := &storeMessage{
statusList: []*jobsdb.JobStatusT{{JobID: 2}},
destJobs: []*jobsdb.JobT{{JobID: 2}},
batchDestJobs: []*jobsdb.JobT{{JobID: 2}},
procErrorJobsByDestID: map[string][]*jobsdb.JobT{
"2": {{JobID: 2}},
},
procErrorJobs: []*jobsdb.JobT{{JobID: 2}},
routerDestIDs: []string{"2"},
reportMetrics: []*types.PUReportedMetric{{}},
sourceDupStats: map[dupStatKey]int{{sourceID: "1"}: 2},
dedupKeys: map[string]struct{}{"2": {}},
totalEvents: 1,
}

merged := storeMessage{
procErrorJobsByDestID: map[string][]*jobsdb.JobT{},
sourceDupStats: map[dupStatKey]int{},
dedupKeys: map[string]struct{}{},
}

merged.merge(sm1)
require.Len(t, merged.statusList, 1, "status list should have 1 element")
require.Len(t, merged.destJobs, 1, "dest jobs should have 1 element")
require.Len(t, merged.batchDestJobs, 1, "batch dest jobs should have 1 element")
require.Len(t, merged.procErrorJobsByDestID, 1, "proc error jobs by dest id should have 1 element")
require.Len(t, merged.procErrorJobs, 1, "proc error jobs should have 1 element")
require.Len(t, merged.routerDestIDs, 1, "router dest ids should have 1 element")
require.Len(t, merged.reportMetrics, 1, "report metrics should have 1 element")
require.Len(t, merged.sourceDupStats, 1, "source dup stats should have 1 element")
require.Len(t, merged.dedupKeys, 1, "dedup keys should have 1 element")
require.Equal(t, merged.totalEvents, 1, "total events should be 1")

merged.merge(sm2)
require.Len(t, merged.statusList, 2, "status list should have 2 elements")
require.Len(t, merged.destJobs, 2, "dest jobs should have 2 elements")
require.Len(t, merged.batchDestJobs, 2, "batch dest jobs should have 2 elements")
require.Len(t, merged.procErrorJobsByDestID, 2, "proc error jobs by dest id should have 2 elements")
require.Len(t, merged.procErrorJobs, 2, "proc error jobs should have 2 elements")
require.Len(t, merged.routerDestIDs, 2, "router dest ids should have 2 elements")
require.Len(t, merged.reportMetrics, 2, "report metrics should have 2 elements")
require.Len(t, merged.sourceDupStats, 1, "source dup stats should have 1 element")
require.EqualValues(t, merged.sourceDupStats[dupStatKey{sourceID: "1"}], 3)
require.Len(t, merged.dedupKeys, 2, "dedup keys should have 2 elements")
require.Equal(t, merged.totalEvents, 2, "total events should be 2")
}

0 comments on commit 7891da3

Please sign in to comment.