Skip to content

Commit

Permalink
feat: save webhook (/source transformation) failures to proc errors (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chandumlg committed Jul 6, 2023
1 parent cfa6132 commit 45a1802
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 66 deletions.
27 changes: 20 additions & 7 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,27 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(

// We need two errorDBs, one in read & one in write mode to support separate gateway to store failures
errDBForRead := jobsdb.NewForRead(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
defer errDBForWrite.Stop()

schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
Expand Down Expand Up @@ -200,7 +213,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
gwDBForProcessor,
routerDB,
batchRouterDB,
errDB,
errDBForRead,
errDBForWrite,
schemaDB,
reportingI,
transientSources,
Expand All @@ -219,7 +233,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
Reporting: reportingI,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: routerDB,
ProcErrorDB: errDB,
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
ThrottlerFactory: throttlerFactory,
Expand All @@ -230,7 +244,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
Reporting: reportingI,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: batchRouterDB,
ProcErrorDB: errDB,
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Debugger: destinationHandle,
Expand All @@ -243,7 +257,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
GatewayDB: gwDBForProcessor,
RouterDB: routerDB,
BatchRouterDB: batchRouterDB,
ErrorDB: errDB,
ErrorDB: errDBForRead,
EventSchemaDB: schemaDB,
Processor: proc,
Router: rt,
Expand All @@ -262,15 +276,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
"gw",
jobsdb.WithClearDB(options.ClearDB),
)
defer gwDBForProcessor.Close()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
defer gatewayDB.Stop()

err = gw.Setup(
ctx,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDBForWrite,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
defer gatewayDB.Stop()

errDB := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
)
defer errDB.Close()

if err := errDB.Start(); err != nil {
return fmt.Errorf("could not start errDB: %w", err)
}
defer errDB.Stop()

g, ctx := errgroup.WithContext(ctx)

modeProvider, err := resolveModeProvider(a.log, deploymentType)
Expand All @@ -115,7 +127,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
err = gw.Setup(
ctx,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
)
if err != nil {
Expand Down
21 changes: 16 additions & 5 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,24 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
errDBForRead := jobsdb.NewForRead(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
defer errDBForWrite.Stop()
schemaDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
Expand Down Expand Up @@ -209,7 +219,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
gwDBForProcessor,
routerDB,
batchRouterDB,
errDB,
errDBForRead,
errDBForWrite,
schemaDB,
reportingI,
transientSources,
Expand All @@ -228,7 +239,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
Reporting: reportingI,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: routerDB,
ProcErrorDB: errDB,
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
ThrottlerFactory: throttlerFactory,
Expand All @@ -239,7 +250,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
Reporting: reportingI,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: batchRouterDB,
ProcErrorDB: errDB,
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Debugger: destinationHandle,
Expand All @@ -253,7 +264,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
GatewayDB: gwDBForProcessor,
RouterDB: routerDB,
BatchRouterDB: batchRouterDB,
ErrorDB: errDB,
ErrorDB: errDBForRead,
SchemaForwarder: schemaForwarder,
EventSchemaDB: schemaDB,
Processor: p,
Expand Down
16 changes: 10 additions & 6 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,11 @@ func TestDynamicClusterManager(t *testing.T) {
defer rtDB.TearDown()
brtDB := jobsdb.NewForReadWrite("batch_rt")
defer brtDB.TearDown()
errDB := jobsdb.NewForReadWrite("proc_error")
defer errDB.TearDown()
readErrDB := jobsdb.NewForRead("proc_error")
defer readErrDB.TearDown()
writeErrDB := jobsdb.NewForWrite("proc_error")
require.NoError(t, writeErrDB.Start())
defer writeErrDB.TearDown()

clearDb := false
ctx := context.Background()
Expand All @@ -198,7 +201,8 @@ func TestDynamicClusterManager(t *testing.T) {
gwDB,
rtDB,
brtDB,
errDB,
readErrDB,
writeErrDB,
eschDB,
&reporting.NOOP{},
transientsource.NewEmptyService(),
Expand All @@ -217,15 +221,15 @@ func TestDynamicClusterManager(t *testing.T) {
Reporting: &reporting.NOOP{},
BackendConfig: mockBackendConfig,
RouterDB: rtDB,
ProcErrorDB: errDB,
ProcErrorDB: readErrDB,
TransientSources: transientsource.NewEmptyService(),
RsourcesService: mockRsourcesService,
}
brtFactory := &batchrouter.Factory{
Reporting: &reporting.NOOP{},
BackendConfig: mockBackendConfig,
RouterDB: brtDB,
ProcErrorDB: errDB,
ProcErrorDB: readErrDB,
TransientSources: transientsource.NewEmptyService(),
RsourcesService: mockRsourcesService,
}
Expand All @@ -252,7 +256,7 @@ func TestDynamicClusterManager(t *testing.T) {
GatewayDB: gwDB,
RouterDB: rtDB,
BatchRouterDB: brtDB,
ErrorDB: errDB,
ErrorDB: readErrDB,
EventSchemaDB: eschDB,
SchemaForwarder: schemaForwarder,

Expand Down
38 changes: 37 additions & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/rudderlabs/rudder-server/gateway/internal/bot"
"github.com/rudderlabs/rudder-server/gateway/webhook/model"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -162,6 +163,7 @@ type HandleT struct {
userWorkerBatchRequestQ chan *userWorkerBatchRequestT
batchUserWorkerBatchRequestQ chan *batchUserWorkerBatchRequestT
jobsDB jobsdb.JobsDB
errDB jobsdb.JobsDB
ackCount uint64
recvCount uint64
backendConfig backendconfig.BackendConfig
Expand Down Expand Up @@ -343,6 +345,7 @@ func (gateway *HandleT) dbWriterWorkerProcess() {
errorMessagesMap[batch[0].UUID] = errorMessage
}
}

cancel()
gateway.dbWritesStat.Count(1)

Expand Down Expand Up @@ -1496,6 +1499,38 @@ func (*HandleT) GetWebhookSourceDefName(writeKey string) (name string, ok bool)
return
}

// SaveErrors saves errors to the error db
func (gateway *HandleT) SaveWebhookFailures(reqs []*model.FailedWebhookPayload) error {
jobs := make([]*jobsdb.JobT, 0, len(reqs))
for _, req := range reqs {
params := map[string]interface{}{
"source_id": gateway.getSourceIDForWriteKey(req.WriteKey),
"stage": "webhook",
"source_type": req.SourceType,
"reason": req.Reason,
}
marshalledParams, err := json.Marshal(params)
if err != nil {
gateway.logger.Errorf("[Gateway] Failed to marshal parameters map. Parameters: %+v", params)
marshalledParams = []byte(`{"error": "rudder-server gateway failed to marshal params"}`)
}

jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UserID: uuid.New().String(), // Using a random userid for these failures. There is no notion of user id for these events.
Parameters: marshalledParams,
CustomVal: "WEBHOOK",
EventPayload: req.Payload,
EventCount: 1,
WorkspaceId: gateway.getWorkspaceForWriteKey(req.WriteKey),
})
}

ctx, cancel := context.WithTimeout(context.Background(), WriteTimeout)
defer cancel()
return gateway.errDB.Store(ctx, jobs)
}

/*
Setup initializes this module:
- Monitors backend config for changes.
Expand All @@ -1507,7 +1542,7 @@ This function will block until backend config is initially received.
*/
func (gateway *HandleT) Setup(
ctx context.Context,
application app.App, backendConfig backendconfig.BackendConfig, jobsDB jobsdb.JobsDB,
application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errDB jobsdb.JobsDB,
rateLimiter throttler.Throttler, versionHandler func(w http.ResponseWriter, r *http.Request),
rsourcesService rsources.JobService, sourcehandle sourcedebugger.SourceDebugger,
) error {
Expand Down Expand Up @@ -1540,6 +1575,7 @@ func (gateway *HandleT) Setup(
gateway.batchUserWorkerBatchRequestQ = make(chan *batchUserWorkerBatchRequestT, maxDBWriterProcess)
gateway.emptyAnonIdHeaderStat = gateway.stats.NewStat("gateway.empty_anonymous_id_header", stats.CountType)
gateway.jobsDB = jobsDB
gateway.errDB = errDB

gateway.versionHandler = versionHandler

Expand Down

0 comments on commit 45a1802

Please sign in to comment.