diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 64089e43f7..52ba9d558e 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -160,7 +160,9 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), ) defer batchRouterDB.Close() - errDB := jobsdb.NewForReadWrite( + + // We need two errorDBs, one in read & one in write mode to support separate gateway to store failures + errDBForRead := jobsdb.NewForRead( "proc_error", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithPreBackupHandlers(prebackupHandlers), @@ -168,6 +170,17 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithFileUploaderProvider(fileUploaderProvider), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), ) + defer errDBForRead.Close() + errDBForWrite := jobsdb.NewForWrite( + "proc_error", + jobsdb.WithClearDB(options.ClearDB), + jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), + ) + if err = errDBForWrite.Start(); err != nil { + return fmt.Errorf("could not start errDBForWrite: %w", err) + } + defer errDBForWrite.Stop() + schemaDB := jobsdb.NewForReadWrite( "esch", jobsdb.WithClearDB(options.ClearDB), @@ -200,7 +213,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) gwDBForProcessor, routerDB, batchRouterDB, - errDB, + errDBForRead, + errDBForWrite, schemaDB, reportingI, transientSources, @@ -219,7 +233,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) Reporting: reportingI, BackendConfig: backendconfig.DefaultBackendConfig, RouterDB: routerDB, - ProcErrorDB: errDB, + ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, ThrottlerFactory: throttlerFactory, @@ -230,7 +244,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) Reporting: reportingI, BackendConfig: backendconfig.DefaultBackendConfig, RouterDB: batchRouterDB, - ProcErrorDB: errDB, + ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, Debugger: destinationHandle, @@ -243,7 +257,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) GatewayDB: gwDBForProcessor, RouterDB: routerDB, BatchRouterDB: batchRouterDB, - ErrorDB: errDB, + ErrorDB: errDBForRead, EventSchemaDB: schemaDB, Processor: proc, Router: rt, @@ -262,7 +276,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) "gw", jobsdb.WithClearDB(options.ClearDB), ) - defer gwDBForProcessor.Close() if err = gatewayDB.Start(); err != nil { return fmt.Errorf("could not start gateway: %w", err) } @@ -270,7 +283,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) err = gw.Setup( ctx, - a.app, backendconfig.DefaultBackendConfig, gatewayDB, + a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDBForWrite, rateLimiter, a.versionHandler, rsourcesService, sourceHandle, ) if err != nil { diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 3e6dcead78..d4e08df196 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -89,6 +89,18 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) } defer gatewayDB.Stop() + errDB := jobsdb.NewForWrite( + "proc_error", + jobsdb.WithClearDB(options.ClearDB), + jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), + ) + defer errDB.Close() + + if err := errDB.Start(); err != nil { + return fmt.Errorf("could not start errDB: %w", err) + } + defer errDB.Stop() + g, ctx := errgroup.WithContext(ctx) modeProvider, err := resolveModeProvider(a.log, deploymentType) @@ -115,7 +127,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) } err = gw.Setup( ctx, - a.app, backendconfig.DefaultBackendConfig, gatewayDB, + a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB, rateLimiter, a.versionHandler, rsourcesService, sourceHandle, ) if err != nil { diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index fbd9839da6..df38cedd80 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -169,7 +169,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), ) defer batchRouterDB.Close() - errDB := jobsdb.NewForReadWrite( + errDBForRead := jobsdb.NewForRead( "proc_error", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithPreBackupHandlers(prebackupHandlers), @@ -177,6 +177,16 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithFileUploaderProvider(fileUploaderProvider), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), ) + defer errDBForRead.Close() + errDBForWrite := jobsdb.NewForWrite( + "proc_error", + jobsdb.WithClearDB(options.ClearDB), + jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), + ) + if err = errDBForWrite.Start(); err != nil { + return fmt.Errorf("could not start errDBForWrite: %w", err) + } + defer errDBForWrite.Stop() schemaDB := jobsdb.NewForReadWrite( "esch", jobsdb.WithClearDB(options.ClearDB), @@ -209,7 +219,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options gwDBForProcessor, routerDB, batchRouterDB, - errDB, + errDBForRead, + errDBForWrite, schemaDB, reportingI, transientSources, @@ -228,7 +239,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options Reporting: reportingI, BackendConfig: backendconfig.DefaultBackendConfig, RouterDB: routerDB, - ProcErrorDB: errDB, + ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, ThrottlerFactory: throttlerFactory, @@ -239,7 +250,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options Reporting: reportingI, BackendConfig: backendconfig.DefaultBackendConfig, RouterDB: batchRouterDB, - ProcErrorDB: errDB, + ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, Debugger: destinationHandle, @@ -253,7 +264,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options GatewayDB: gwDBForProcessor, RouterDB: routerDB, BatchRouterDB: batchRouterDB, - ErrorDB: errDB, + ErrorDB: errDBForRead, SchemaForwarder: schemaForwarder, EventSchemaDB: schemaDB, Processor: p, diff --git a/app/cluster/integration_test.go b/app/cluster/integration_test.go index 29f156e9a8..22c84139f2 100644 --- a/app/cluster/integration_test.go +++ b/app/cluster/integration_test.go @@ -184,8 +184,11 @@ func TestDynamicClusterManager(t *testing.T) { defer rtDB.TearDown() brtDB := jobsdb.NewForReadWrite("batch_rt") defer brtDB.TearDown() - errDB := jobsdb.NewForReadWrite("proc_error") - defer errDB.TearDown() + readErrDB := jobsdb.NewForRead("proc_error") + defer readErrDB.TearDown() + writeErrDB := jobsdb.NewForWrite("proc_error") + require.NoError(t, writeErrDB.Start()) + defer writeErrDB.TearDown() clearDb := false ctx := context.Background() @@ -198,7 +201,8 @@ func TestDynamicClusterManager(t *testing.T) { gwDB, rtDB, brtDB, - errDB, + readErrDB, + writeErrDB, eschDB, &reporting.NOOP{}, transientsource.NewEmptyService(), @@ -217,7 +221,7 @@ func TestDynamicClusterManager(t *testing.T) { Reporting: &reporting.NOOP{}, BackendConfig: mockBackendConfig, RouterDB: rtDB, - ProcErrorDB: errDB, + ProcErrorDB: readErrDB, TransientSources: transientsource.NewEmptyService(), RsourcesService: mockRsourcesService, } @@ -225,7 +229,7 @@ func TestDynamicClusterManager(t *testing.T) { Reporting: &reporting.NOOP{}, BackendConfig: mockBackendConfig, RouterDB: brtDB, - ProcErrorDB: errDB, + ProcErrorDB: readErrDB, TransientSources: transientsource.NewEmptyService(), RsourcesService: mockRsourcesService, } @@ -252,7 +256,7 @@ func TestDynamicClusterManager(t *testing.T) { GatewayDB: gwDB, RouterDB: rtDB, BatchRouterDB: brtDB, - ErrorDB: errDB, + ErrorDB: readErrDB, EventSchemaDB: eschDB, SchemaForwarder: schemaForwarder, diff --git a/gateway/gateway.go b/gateway/gateway.go index 486b419d8c..e1faf87a20 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -19,6 +19,7 @@ import ( "time" "github.com/rudderlabs/rudder-server/gateway/internal/bot" + "github.com/rudderlabs/rudder-server/gateway/webhook/model" "golang.org/x/sync/errgroup" @@ -162,6 +163,7 @@ type HandleT struct { userWorkerBatchRequestQ chan *userWorkerBatchRequestT batchUserWorkerBatchRequestQ chan *batchUserWorkerBatchRequestT jobsDB jobsdb.JobsDB + errDB jobsdb.JobsDB ackCount uint64 recvCount uint64 backendConfig backendconfig.BackendConfig @@ -343,6 +345,7 @@ func (gateway *HandleT) dbWriterWorkerProcess() { errorMessagesMap[batch[0].UUID] = errorMessage } } + cancel() gateway.dbWritesStat.Count(1) @@ -1496,6 +1499,38 @@ func (*HandleT) GetWebhookSourceDefName(writeKey string) (name string, ok bool) return } +// SaveErrors saves errors to the error db +func (gateway *HandleT) SaveWebhookFailures(reqs []*model.FailedWebhookPayload) error { + jobs := make([]*jobsdb.JobT, 0, len(reqs)) + for _, req := range reqs { + params := map[string]interface{}{ + "source_id": gateway.getSourceIDForWriteKey(req.WriteKey), + "stage": "webhook", + "source_type": req.SourceType, + "reason": req.Reason, + } + marshalledParams, err := json.Marshal(params) + if err != nil { + gateway.logger.Errorf("[Gateway] Failed to marshal parameters map. Parameters: %+v", params) + marshalledParams = []byte(`{"error": "rudder-server gateway failed to marshal params"}`) + } + + jobs = append(jobs, &jobsdb.JobT{ + UUID: uuid.New(), + UserID: uuid.New().String(), // Using a random userid for these failures. There is no notion of user id for these events. + Parameters: marshalledParams, + CustomVal: "WEBHOOK", + EventPayload: req.Payload, + EventCount: 1, + WorkspaceId: gateway.getWorkspaceForWriteKey(req.WriteKey), + }) + } + + ctx, cancel := context.WithTimeout(context.Background(), WriteTimeout) + defer cancel() + return gateway.errDB.Store(ctx, jobs) +} + /* Setup initializes this module: - Monitors backend config for changes. @@ -1507,7 +1542,7 @@ This function will block until backend config is initially received. */ func (gateway *HandleT) Setup( ctx context.Context, - application app.App, backendConfig backendconfig.BackendConfig, jobsDB jobsdb.JobsDB, + application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errDB jobsdb.JobsDB, rateLimiter throttler.Throttler, versionHandler func(w http.ResponseWriter, r *http.Request), rsourcesService rsources.JobService, sourcehandle sourcedebugger.SourceDebugger, ) error { @@ -1540,6 +1575,7 @@ func (gateway *HandleT) Setup( gateway.batchUserWorkerBatchRequestQ = make(chan *batchUserWorkerBatchRequestT, maxDBWriterProcess) gateway.emptyAnonIdHeaderStat = gateway.stats.NewStat("gateway.empty_anonymous_id_header", stats.CountType) gateway.jobsDB = jobsDB + gateway.errDB = errDB gateway.versionHandler = versionHandler diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index f6d62d81a6..cc0eb71fac 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -35,6 +35,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" "github.com/rudderlabs/rudder-server/gateway/response" + webhookModel "github.com/rudderlabs/rudder-server/gateway/webhook/model" "github.com/rudderlabs/rudder-server/jobsdb" mocksApp "github.com/rudderlabs/rudder-server/mocks/app" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" @@ -110,6 +111,7 @@ type testContext struct { mockCtrl *gomock.Controller mockJobsDB *mocksJobsDB.MockJobsDB + mockErrJobsDB *mocksJobsDB.MockJobsDB mockBackendConfig *mocksBackendConfig.MockBackendConfig mockRateLimiter *mockGateway.MockThrottler mockApp *mocksApp.MockApp @@ -141,6 +143,7 @@ func (c *testContext) Setup() { c.asyncHelper.Setup() c.mockCtrl = gomock.NewController(GinkgoT()) c.mockJobsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) + c.mockErrJobsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) c.mockBackendConfig = mocksBackendConfig.NewMockBackendConfig(c.mockCtrl) c.mockApp = mocksApp.NewMockApp(c.mockCtrl) c.mockRateLimiter = mockGateway.NewMockThrottler(c.mockCtrl) @@ -233,7 +236,7 @@ var _ = Describe("Gateway Enterprise", func() { gateway = &HandleT{} BeforeEach(func() { - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() gateway.stats = statsStore @@ -359,7 +362,7 @@ var _ = Describe("Gateway", func() { Context("Initialization", func() { It("should wait for backend config", func() { gateway := &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) err = gateway.Shutdown() Expect(err).To(BeNil()) @@ -393,7 +396,7 @@ var _ = Describe("Gateway", func() { loadConfig() gateway = &HandleT{} - err = gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err = gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) gateway.irh = mockRequestHandler{} gateway.rrh = mockRequestHandler{} @@ -480,7 +483,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() gateway.stats = statsStore @@ -631,7 +634,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() @@ -734,7 +737,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &HandleT{} SetEnableRateLimit(true) - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() gateway.stats = statsStore @@ -821,7 +824,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() gateway.stats = statsStore @@ -1115,7 +1118,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) }) @@ -1137,7 +1140,7 @@ var _ = Describe("Gateway", func() { ) BeforeEach(func() { gateway = &HandleT{} - err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) }) @@ -1229,6 +1232,73 @@ var _ = Describe("Gateway", func() { Expect(err).To(BeNil()) }) }) + + Context("SaveWebhookFailures", func() { + var gateway *HandleT + BeforeEach(func() { + gateway = &HandleT{} + err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + Expect(err).To(BeNil()) + }) + + AfterEach(func() { + err := gateway.Shutdown() + Expect(err).To(BeNil()) + }) + + It("should save failures to error db", func() { + c.mockErrJobsDB. + EXPECT().Store( + gomock.Any(), + gomock.Any(), + ). + DoAndReturn( + func( + ctx context.Context, + jobs []*jobsdb.JobT, + ) error { + for idx, job := range jobs { + Expect(misc.IsValidUUID(job.UUID.String())).To(Equal(true)) + Expect(job.CustomVal).To(Equal("WEBHOOK")) + + var paramsMap, expectedParamsMap map[string]interface{} + var expectedStr []byte + + switch idx { + case 0: + Expect(job.EventPayload).To(Equal(json.RawMessage(`{"a1": "b1"}`))) + expectedStr = []byte(fmt.Sprintf(`{"source_id": "%v", "stage": "webhook", "source_type": "cio", "reason": "err1"}`, SourceIDEnabled)) + case 1: + Expect(job.EventPayload).To(Equal(json.RawMessage(`{"a2": "b2"}`))) + expectedStr = []byte(fmt.Sprintf(`{"source_id": "%v", "stage": "webhook", "source_type": "af", "reason": "err2"}`, SourceIDEnabled)) + } + + _ = json.Unmarshal(job.Parameters, ¶msMap) + _ = json.Unmarshal(expectedStr, &expectedParamsMap) + equals := reflect.DeepEqual(paramsMap, expectedParamsMap) + Expect(equals).To(Equal(true)) + } + return nil + }). + Times(1) + + reqs := make([]*webhookModel.FailedWebhookPayload, 2) + reqs[0] = &webhookModel.FailedWebhookPayload{ + WriteKey: WriteKeyEnabled, + Payload: []byte(`{"a1": "b1"}`), + SourceType: "cio", + Reason: "err1", + } + reqs[1] = &webhookModel.FailedWebhookPayload{ + WriteKey: WriteKeyEnabled, + Payload: []byte(`{"a2": "b2"}`), + SourceType: "af", + Reason: "err2", + } + err := gateway.SaveWebhookFailures(reqs) + Expect(err).To(BeNil()) + }) + }) }) func unauthorizedRequest(body io.Reader) *http.Request { diff --git a/gateway/mocks/mockwebhook.go b/gateway/mocks/mockwebhook.go index dee9168a5c..73aad82a04 100644 --- a/gateway/mocks/mockwebhook.go +++ b/gateway/mocks/mockwebhook.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" stats "github.com/rudderlabs/rudder-server/gateway/internal/stats" + model "github.com/rudderlabs/rudder-server/gateway/webhook/model" ) // MockGatewayI is a mock of GatewayI interface. @@ -102,6 +103,20 @@ func (mr *MockGatewayIMockRecorder) ProcessWebRequest(arg0, arg1, arg2, arg3, ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessWebRequest", reflect.TypeOf((*MockGatewayI)(nil).ProcessWebRequest), arg0, arg1, arg2, arg3, arg4) } +// SaveWebhookFailures mocks base method. +func (m *MockGatewayI) SaveWebhookFailures(arg0 []*model.FailedWebhookPayload) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveWebhookFailures", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveWebhookFailures indicates an expected call of SaveWebhookFailures. +func (mr *MockGatewayIMockRecorder) SaveWebhookFailures(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWebhookFailures", reflect.TypeOf((*MockGatewayI)(nil).SaveWebhookFailures), arg0) +} + // TrackRequestMetrics mocks base method. func (m *MockGatewayI) TrackRequestMetrics(arg0 string) { m.ctrl.T.Helper() diff --git a/gateway/webhook/model/model.go b/gateway/webhook/model/model.go new file mode 100644 index 0000000000..9d92103ef8 --- /dev/null +++ b/gateway/webhook/model/model.go @@ -0,0 +1,8 @@ +package model + +type FailedWebhookPayload struct { + WriteKey string + Payload []byte + SourceType string + Reason string +} diff --git a/gateway/webhook/setup.go b/gateway/webhook/setup.go index b367567cb3..11ea17b2df 100644 --- a/gateway/webhook/setup.go +++ b/gateway/webhook/setup.go @@ -13,6 +13,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats" + "github.com/rudderlabs/rudder-server/gateway/webhook/model" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -23,6 +24,7 @@ type GatewayI interface { ProcessWebRequest(writer *http.ResponseWriter, req *http.Request, reqType string, requestPayload []byte, writeKey string) string GetWebhookSourceDefName(writeKey string) (name string, ok bool) NewSourceStat(writeKey, reqType string) *gwstats.SourceStat + SaveWebhookFailures([]*model.FailedWebhookPayload) error } type WebHookI interface { diff --git a/gateway/webhook/webhook.go b/gateway/webhook/webhook.go index 7fb7fb396d..5c0f895ba9 100644 --- a/gateway/webhook/webhook.go +++ b/gateway/webhook/webhook.go @@ -23,6 +23,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats" gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats" "github.com/rudderlabs/rudder-server/gateway/response" + "github.com/rudderlabs/rudder-server/gateway/webhook/model" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -391,9 +392,25 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() { for _, req := range breq.batchRequest { req.done <- transformerResponse{StatusCode: statusCode, Err: batchResponse.batchError.Error()} } + + // Saving failures to errors jobsdb + failedWebhookPayloads := make([]*model.FailedWebhookPayload, len(webRequests)) + for i, webRequest := range webRequests { + failedWebhookPayloads[i] = &model.FailedWebhookPayload{ + WriteKey: webRequest.writeKey, + Payload: payloadArr[i], + SourceType: breq.sourceType, + Reason: batchResponse.batchError.Error(), + } + } + if err := bt.webhook.gwHandle.SaveWebhookFailures(failedWebhookPayloads); err != nil { + pkgLogger.Errorf("Saving webhook failures of sourceType: %s, failed. Error: %s", breq.sourceType, err.Error()) + } + continue } + failedWebhookPayloads := make([]*model.FailedWebhookPayload, 0) bt.stats.sourceStats[breq.sourceType].numOutputEvents.Count(len(batchResponse.responses)) for idx, resp := range batchResponse.responses { @@ -411,16 +428,25 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() { if errMessage != "" { pkgLogger.Errorf("webhook %s source transformation failed: %s", breq.sourceType, errMessage) bt.webhook.countWebhookErrors(breq.sourceType, webRequest.writeKey, reason, response.GetErrorStatusCode(errMessage), 1) + failedWebhookPayloads = append(failedWebhookPayloads, &model.FailedWebhookPayload{WriteKey: webRequest.writeKey, Payload: payloadArr[idx], SourceType: breq.sourceType, Reason: errMessage}) webRequest.done <- bt.markResponseFail(errMessage) continue } } else if resp.StatusCode != http.StatusOK { + failedWebhookPayloads = append(failedWebhookPayloads, &model.FailedWebhookPayload{WriteKey: webRequest.writeKey, Payload: payloadArr[idx], SourceType: breq.sourceType, Reason: resp.Err}) pkgLogger.Errorf("webhook %s source transformation failed with error: %s and status code: %s", breq.sourceType, resp.Err, resp.StatusCode) bt.webhook.countWebhookErrors(breq.sourceType, webRequest.writeKey, "non 200 response", resp.StatusCode, 1) } webRequest.done <- resp } + + // Saving failures to errors jobsdb + if len(failedWebhookPayloads) > 0 { + if err := bt.webhook.gwHandle.SaveWebhookFailures(failedWebhookPayloads); err != nil { + pkgLogger.Errorf("Saving webhook failures of sourceType: %s, failed. Error: %s", breq.sourceType, err.Error()) + } + } } } diff --git a/gateway/webhook/webhook_test.go b/gateway/webhook/webhook_test.go index cb23b82f45..8c45dd2640 100644 --- a/gateway/webhook/webhook_test.go +++ b/gateway/webhook/webhook_test.go @@ -67,6 +67,7 @@ func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) { mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2) + mockGW.EXPECT().SaveWebhookFailures(gomock.Any()).Return(nil).Times(1) webhookHandler.Register(sourceDefName) req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson)) @@ -108,6 +109,7 @@ func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t * mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2) + mockGW.EXPECT().SaveWebhookFailures(gomock.Any()).Return(nil).Times(1) webhookHandler.Register(sourceDefName) req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson)) @@ -148,6 +150,7 @@ func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) { mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2) + mockGW.EXPECT().SaveWebhookFailures(gomock.Any()).Return(nil).Times(1) webhookHandler.Register(sourceDefName) req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson)) diff --git a/processor/manager.go b/processor/manager.go index f8390e48b7..59c0cd64a6 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -25,7 +25,8 @@ type LifecycleManager struct { gatewayDB *jobsdb.HandleT routerDB *jobsdb.HandleT batchRouterDB *jobsdb.HandleT - errDB *jobsdb.HandleT + readErrDB *jobsdb.HandleT + writeErrDB *jobsdb.HandleT esDB *jobsdb.HandleT clearDB *bool ReportingI types.Reporting // need not initialize again @@ -47,7 +48,7 @@ func (proc *LifecycleManager) Start() error { } proc.Handle.Setup( - proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.errDB, proc.esDB, + proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.readErrDB, proc.writeErrDB, proc.esDB, proc.ReportingI, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger, ) @@ -81,7 +82,7 @@ func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) { } // New creates a new Processor instance -func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb, esDB *jobsdb.HandleT, +func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB *jobsdb.HandleT, reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, opts ...Opts, @@ -92,7 +93,8 @@ func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb, esDB *job gatewayDB: gwDb, routerDB: rtDb, batchRouterDB: brtDb, - errDB: errDb, + readErrDB: errDbForRead, + writeErrDB: errDBForWrite, esDB: esDB, clearDB: clearDb, BackendConfig: backendconfig.DefaultBackendConfig, diff --git a/processor/manager_test.go b/processor/manager_test.go index 5cf05ab16c..06ae62bdf5 100644 --- a/processor/manager_test.go +++ b/processor/manager_test.go @@ -189,8 +189,11 @@ func TestProcessorManager(t *testing.T) { defer rtDB.Close() brtDB := jobsdb.NewForReadWrite("batch_rt") defer brtDB.Close() - errDB := jobsdb.NewForReadWrite("proc_error") - defer errDB.Close() + readErrDB := jobsdb.NewForRead("proc_error") + defer readErrDB.Close() + writeErrDB := jobsdb.NewForWrite("proc_error") + require.NoError(t, writeErrDB.Start()) + defer writeErrDB.TearDown() eschDB := jobsdb.NewForReadWrite("esch") defer eschDB.Close() @@ -203,7 +206,8 @@ func TestProcessorManager(t *testing.T) { gwDB, rtDB, brtDB, - errDB, + readErrDB, + writeErrDB, eschDB, &reporting.NOOP{}, transientsource.NewEmptyService(), @@ -223,8 +227,8 @@ func TestProcessorManager(t *testing.T) { defer rtDB.Stop() require.NoError(t, brtDB.Start()) defer brtDB.Stop() - require.NoError(t, errDB.Start()) - defer errDB.Stop() + require.NoError(t, readErrDB.Start()) + defer readErrDB.Stop() mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1).DoAndReturn( func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel { ch := make(chan pubsub.DataEvent, 1) @@ -262,8 +266,8 @@ func TestProcessorManager(t *testing.T) { defer rtDB.Stop() require.NoError(t, brtDB.Start()) defer brtDB.Stop() - require.NoError(t, errDB.Start()) - defer errDB.Stop() + require.NoError(t, readErrDB.Start()) + defer readErrDB.Stop() mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1).DoAndReturn( func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel { ch := make(chan pubsub.DataEvent, 1) diff --git a/processor/processor.go b/processor/processor.go index 61d72186c2..b81c9baae2 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -75,7 +75,8 @@ type Handle struct { gatewayDB jobsdb.JobsDB routerDB jobsdb.JobsDB batchRouterDB jobsdb.JobsDB - errorDB jobsdb.JobsDB + readErrorDB jobsdb.JobsDB + writeErrorDB jobsdb.JobsDB eventSchemaDB jobsdb.JobsDB logger logger.Logger @@ -322,7 +323,7 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination // Setup initializes the module func (proc *Handle) Setup( backendConfig backendconfig.BackendConfig, gatewayDB, routerDB, - batchRouterDB, errorDB, eventSchemaDB jobsdb.JobsDB, reporting types.Reporting, + batchRouterDB, readErrorDB, writeErrorDB, eventSchemaDB jobsdb.JobsDB, reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, ) { @@ -339,7 +340,8 @@ func (proc *Handle) Setup( proc.gatewayDB = gatewayDB proc.routerDB = routerDB proc.batchRouterDB = batchRouterDB - proc.errorDB = errorDB + proc.readErrorDB = readErrorDB + proc.writeErrorDB = writeErrorDB proc.eventSchemaDB = eventSchemaDB proc.transientSources = transientSources @@ -526,7 +528,7 @@ func (proc *Handle) Start(ctx context.Context) error { g.Go(misc.WithBugsnag(func() error { st := stash.New() st.Setup( - proc.errorDB, + proc.readErrorDB, proc.transientSources, proc.fileuploader, proc.adaptiveLimit, @@ -1969,7 +1971,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) { } if len(in.procErrorJobs) > 0 { err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout, proc.jobdDBMaxRetries, func(ctx context.Context) error { - return proc.errorDB.Store(ctx, in.procErrorJobs) + return proc.writeErrorDB.Store(ctx, in.procErrorJobs) }, proc.sendRetryStoreStats) if err != nil { proc.logger.Errorf("Store into proc error table failed with error: %v", err) @@ -2428,7 +2430,7 @@ func (proc *Handle) saveFailedJobs(failedJobs []*jobsdb.JobT) { if len(failedJobs) > 0 { rsourcesStats := rsources.NewFailedJobsCollector(proc.rsourcesService) rsourcesStats.JobsFailed(failedJobs) - _ = proc.errorDB.WithTx(func(tx *jobsdb.Tx) error { + _ = proc.writeErrorDB.WithTx(func(tx *jobsdb.Tx) error { return rsourcesStats.Publish(context.TODO(), tx.Tx) }) } diff --git a/processor/processor_test.go b/processor/processor_test.go index 5334426a7b..202a53c516 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -53,7 +53,8 @@ type testContext struct { mockGatewayJobsDB *mocksJobsDB.MockJobsDB mockRouterJobsDB *mocksJobsDB.MockJobsDB mockBatchRouterJobsDB *mocksJobsDB.MockJobsDB - mockProcErrorsDB *mocksJobsDB.MockJobsDB + mockReadProcErrorsDB *mocksJobsDB.MockJobsDB + mockWriteProcErrorsDB *mocksJobsDB.MockJobsDB mockEventSchemasDB *mocksJobsDB.MockJobsDB MockReportingI *mockReportingTypes.MockReporting MockDedup *mockDedup.MockDedup @@ -66,7 +67,8 @@ func (c *testContext) Setup() { c.mockGatewayJobsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) c.mockRouterJobsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) c.mockBatchRouterJobsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) - c.mockProcErrorsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) + c.mockReadProcErrorsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) + c.mockWriteProcErrorsDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) c.mockEventSchemasDB = mocksJobsDB.NewMockJobsDB(c.mockCtrl) c.MockRsourcesService = rsources.NewMockJobService(c.mockCtrl) @@ -630,7 +632,7 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockReadProcErrorsDB, c.mockWriteProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -644,7 +646,7 @@ var _ = Describe("Processor", Ordered, func() { c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockReadProcErrorsDB, c.mockWriteProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -663,7 +665,7 @@ var _ = Describe("Processor", Ordered, func() { processor := prepareHandle(NewHandle(mockTransformer)) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockReadProcErrorsDB, c.mockWriteProcErrorsDB, nil, c.MockReportingI, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -1302,12 +1304,12 @@ var _ = Describe("Processor", Ordered, func() { }) // will be used to save failed events to failed keys table - c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { + c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { _ = f(&jobsdb.Tx{}) }).Times(1) // One Store call is expected for all events - c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). + c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). Do(func(ctx context.Context, jobs []*jobsdb.JobT) { Expect(jobs).To(HaveLen(2)) for i, job := range jobs { @@ -1437,12 +1439,12 @@ var _ = Describe("Processor", Ordered, func() { assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) }) - c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { + c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { _ = f(&jobsdb.Tx{}) }).Return(nil).Times(1) // One Store call is expected for all events - c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). + c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). Do(func(ctx context.Context, jobs []*jobsdb.JobT) { Expect(jobs).To(HaveLen(1)) for _, job := range jobs { @@ -1517,12 +1519,12 @@ var _ = Describe("Processor", Ordered, func() { assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) }) - c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { + c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { _ = f(&jobsdb.Tx{}) }).Return(nil).Times(0) // One Store call is expected for all events - c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(0). + c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(0). Do(func(ctx context.Context, jobs []*jobsdb.JobT) {}) processorSetupAndAssertJobHandling(processor, c) @@ -1539,7 +1541,7 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) processor.config.featuresRetryMaxAttempts = 0 - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockReadProcErrorsDB, c.mockWriteProcErrorsDB, nil, nil, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) setMainLoopTimeout(processor, 1*time.Second) @@ -1547,9 +1549,9 @@ var _ = Describe("Processor", Ordered, func() { defer cancel() c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) - c.mockProcErrorsDB.EXPECT().FailExecuting().Times(1) - c.mockProcErrorsDB.EXPECT().GetToRetry(gomock.Any(), gomock.Any()).AnyTimes() - c.mockProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1) + c.mockReadProcErrorsDB.EXPECT().GetToRetry(gomock.Any(), gomock.Any()).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes() c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() @@ -1583,15 +1585,15 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) processor.config.featuresRetryMaxAttempts = 0 - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockReadProcErrorsDB, c.mockWriteProcErrorsDB, nil, c.MockReportingI, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) defer processor.Shutdown() c.MockReportingI.EXPECT().WaitForSetup(gomock.Any(), gomock.Any()).Times(1) processor.config.readLoopSleep = time.Millisecond - c.mockProcErrorsDB.EXPECT().FailExecuting() - c.mockProcErrorsDB.EXPECT().GetToRetry(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() - c.mockProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().FailExecuting() + c.mockReadProcErrorsDB.EXPECT().GetToRetry(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() @@ -3027,7 +3029,8 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, - c.mockProcErrorsDB, + c.mockReadProcErrorsDB, + c.mockWriteProcErrorsDB, c.mockEventSchemasDB, c.MockReportingI, transientsource.NewEmptyService(),