Skip to content

Commit

Permalink
fix: router destinationsMap access (#2582)
Browse files Browse the repository at this point in the history
* fix.routerDestinationsMapAccess
* job fails if destination is not found in config
  • Loading branch information
Sidddddarth committed Oct 18, 2022
1 parent ab20ad6 commit 3770720
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 3 deletions.
6 changes: 3 additions & 3 deletions router/router.go
Expand Up @@ -446,23 +446,23 @@ func (worker *workerT) workerProcess() {

worker.rt.configSubscriberLock.RLock()
batchDestination, ok := worker.rt.destinationsMap[parameters.DestinationID]
destination := batchDestination.Destination
worker.rt.configSubscriberLock.RUnlock()
if !ok {
status := jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum,
JobState: jobsdb.Aborted.State,
JobState: jobsdb.Failed.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "",
ErrorResponse: []byte(`{"reason": "Aborted because destination is not available in the config" }`),
ErrorResponse: []byte(`{"reason": "failed because destination is not available in the config" }`),
Parameters: routerutils.EmptyPayload,
WorkspaceId: job.WorkspaceId,
}
worker.rt.responseQ <- jobResponseT{status: &status, worker: worker, userID: userID, JobT: job}
continue
}
destination := batchDestination.Destination
if authType := routerutils.GetAuthType(destination); routerutils.IsNotEmptyString(authType) && authType == "OAuth" {
rudderAccountID := routerutils.GetRudderAccountId(&destination)
if routerutils.IsNotEmptyString(rudderAccountID) {
Expand Down
100 changes: 100 additions & 0 deletions router/router_test.go
Expand Up @@ -73,6 +73,7 @@ const (
// testRemoteAddress = "test.com"
gaDestinationDefinitionID = "gaid1"
gaDestinationID = "did1"
nonexistentDestinationID = "non-existent-destination-id"
)

var (
Expand Down Expand Up @@ -597,6 +598,105 @@ var _ = Describe("Router", func() {
Expect(count).To(Equal(5))
<-done
})

It("fails jobs if destination is not found in config", func() {
mockMultitenantHandle := mocksMultitenant.NewMockMultiTenantI(c.mockCtrl)
mockNetHandle := mocksRouter.NewMockNetHandleI(c.mockCtrl)
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
router := &HandleT{
Reporting: &reporting.NOOP{},
MultitenantI: mockMultitenantHandle,
netHandle: mockNetHandle,
}
mockMultitenantHandle.EXPECT().UpdateWorkspaceLatencyMap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
c.mockBackendConfig.EXPECT().AccessToken().AnyTimes()
router.Setup(c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, gaDestinationConfig, transientsource.NewEmptyService(), rsources.NewNoOpService())
router.transformer = mockTransformer
router.noOfWorkers = 1
router.noOfJobsToBatchInAWorker = 5
router.routerTimeout = time.Duration(60) * time.Second

gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, nonexistentDestinationID)

unprocessedJobsList := []*jobsdb.JobT{
{
UUID: uuid.Must(uuid.NewV4()),
UserID: "u1",
JobID: 2010,
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
CustomVal: customVal["GA"],
EventPayload: []byte(gaPayload),
LastJobStatus: jobsdb.JobStatusT{
AttemptNum: 3, // done only to check the error response assertion(assertJobStatus) as well
},
Parameters: []byte(parameters),
},
}
workspaceCount := map[string]int{}
workspaceCount[workspaceID] = len(unprocessedJobsList)
workspaceCountOut := workspaceCount
callGetRouterPickupJobs := mockMultitenantHandle.EXPECT().
GetRouterPickupJobs(customVal["GA"], gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(workspaceCountOut, map[string]float64{}).Times(1)

payloadLimit := router.payloadLimit
callAllJobs := c.mockRouterJobsDB.EXPECT().GetAllJobs(
gomock.Any(),
workspaceCount,
jobsdb.GetQueryParamsT{
CustomValFilters: []string{customVal["GA"]},
PayloadSizeLimit: payloadLimit,
JobsLimit: len(unprocessedJobsList),
}, 10).
Times(1).
Return(unprocessedJobsList, nil).
After(callGetRouterPickupJobs)

c.mockRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Executing.State, "", `{}`, 3)
}).Return(nil).After(callAllJobs)

done := make(chan struct{})
mockMultitenantHandle.EXPECT().
CalculateSuccessFailureCounts(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).
AnyTimes()
c.mockRouterJobsDB.EXPECT().
WithUpdateSafeTx(
gomock.Any(),
gomock.Any()).
Times(1).
Do(
func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
close(done)
}).Return(nil)

c.mockRouterJobsDB.EXPECT().
UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).
Times(1).
Do(func(ctx context.Context, _ jobsdb.UpdateSafeTx, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
assertJobStatus(
unprocessedJobsList[0],
statuses[0],
jobsdb.Failed.State,
"",
`{"reason": "failed because destination is not available in the config" }`,
3,
)
}).Return(nil)
<-router.backendConfigInitialized
count := router.readAndProcess()
Expect(count).To(Equal(1))
<-done
})
})

Context("Router Batching", func() {
Expand Down

0 comments on commit 3770720

Please sign in to comment.