Skip to content

Commit

Permalink
chore: support generic rules to have routers drain events (#3856)
Browse files Browse the repository at this point in the history
This is a way to pass generic rules by which (batch)router and processor(only jobRunID now) can drain events. How this is set to config is another story.

Routers, BatchRouters now have a drainer which provides the method:
Drain(job *jobsdb.JobT) (bool, string) to check if their jobs can be drained. Support for draining via configurable destinationIDs("Router.toAbortDestinationIDs") and jobRunID("RSources.toAbortJobRunIDs") present now. Adding more drain configurations should be straightforward -> just need to add logic in type drainer and (d *drainer) Drain(...).

Processor has a simpler approach, a new config field is added to accommodate configurations to drain by, jobRunId for now. Also injecting *config.Config into processor now. Plan is to proliferate it's use instead of config.Default in processor package(story for another PR).
  • Loading branch information
Sidddddarth committed Nov 3, 2023
1 parent 307acc3 commit 6583364
Show file tree
Hide file tree
Showing 22 changed files with 513 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func TestReportingDroppedEvents(t *testing.T) {
gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)
wg.Go(func() error {
config.Set("BatchRouter.toAbortDestinationIDs", "destination-1")
config.Set("Router.toAbortDestinationIDs", "destination-1")
err := runRudderServer(ctx, gwPort, postgresContainer, bcserver.URL, trServer.URL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func TestReportingErrorIndex(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
config.Set("BatchRouter.toAbortDestinationIDs", "destination-1")
config.Set("Router.toAbortDestinationIDs", "destination-1")

err := runRudderServer(ctx, gwPort, postgresContainer, minioResource, bcServer.URL, trServer.URL, t.TempDir())
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,28 @@ func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) {
}

// New creates a new Processor instance
func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider,
rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger,
func New(
ctx context.Context,
clearDb *bool,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)),
Handle: NewHandle(
config.Default,
transformer.NewTransformer(
config.Default,
logger.NewLogger().Child("processor"),
stats.Default,
),
),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand Down
98 changes: 82 additions & 16 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ const (

var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary

func NewHandle(transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer}
func NewHandle(c *config.Config, transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer, conf: c}
h.loadConfig()
return h
}

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
lastJobID int64
Expand Down Expand Up @@ -145,6 +146,10 @@ type Handle struct {
eventAuditEnabled map[string]bool
}

drainConfig struct {
jobRunIDs misc.ValueLoader[[]string]
}

adaptiveLimit func(int64) int64
storePlocker kitsync.PartitionLocker
}
Expand Down Expand Up @@ -351,8 +356,11 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination

// Setup initializes the module
func (proc *Handle) Setup(
backendConfig backendconfig.BackendConfig, gatewayDB, routerDB,
batchRouterDB, readErrorDB, writeErrorDB, eventSchemaDB, archivalDB jobsdb.JobsDB, reporting types.Reporting,
backendConfig backendconfig.BackendConfig,
gatewayDB, routerDB, batchRouterDB,
readErrorDB, writeErrorDB,
eventSchemaDB, archivalDB jobsdb.JobsDB,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
Expand All @@ -364,6 +372,9 @@ func (proc *Handle) Setup(
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled")
if proc.conf == nil {
proc.conf = config.Default
}
proc.setupReloadableVars()
proc.logger = logger.NewLogger().Child("processor")
proc.backendConfig = backendConfig
Expand Down Expand Up @@ -483,9 +494,10 @@ func (proc *Handle) Setup(
}

func (proc *Handle) setupReloadableVars() {
proc.jobdDBQueryRequestTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = config.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.jobdDBQueryRequestTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = proc.conf.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.drainConfig.jobRunIDs = proc.conf.GetReloadableStringSliceVar([]string{}, "RSources.toAbortJobRunIDs")
}

// Start starts this processor's main loops.
Expand Down Expand Up @@ -2328,7 +2340,24 @@ func (proc *Handle) transformSrcDest(
s := time.Now()
eventFilterInCount := len(eventsToTransform)
proc.logger.Debug("Supported messages filtering input size", eventFilterInCount)
response = ConvertToFilteredTransformerResponse(eventsToTransform, transformAt != "none")
response = ConvertToFilteredTransformerResponse(
eventsToTransform,
transformAt != "none",
func(event transformer.TransformerEvent) (bool, string) {
if event.Metadata.SourceJobRunID != "" &&
slices.Contains(
proc.drainConfig.jobRunIDs.Load(),
event.Metadata.SourceJobRunID,
) {
proc.logger.Debugf(
"cancelled jobRunID: %s",
event.Metadata.SourceJobRunID,
)
return true, "cancelled jobRunId"
}
return false, ""
},
)
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
Expand Down Expand Up @@ -2544,7 +2573,11 @@ func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
return nil
}

func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, filter bool) transformer.Response {
func ConvertToFilteredTransformerResponse(
events []transformer.TransformerEvent,
filter bool,
drainFunc func(transformer.TransformerEvent) (bool, string),
) transformer.Response {
var responses []transformer.TransformerResponse
var failedEvents []transformer.TransformerResponse

Expand All @@ -2556,10 +2589,23 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
supportedMessageEventsCache := make(map[string]*cacheValue)

// filter unsupported message types
var resp transformer.TransformerResponse
for i := range events {
event := &events[i]

// drain events
if drain, reason := drainFunc(*event); drain {
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.DrainEventCode,
Metadata: event.Metadata,
Error: reason,
},
)
continue
}

if filter {
// filter unsupported message types
supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID]
Expand Down Expand Up @@ -2589,21 +2635,41 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
messageEvent, typOk := event.Message["event"].(string)
if !typOk {
// add to FailedEvents
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: "Invalid message event. Type assertion failed"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 400,
Metadata: event.Metadata,
Error: "Invalid message event. Type assertion failed",
},
)
continue
}
if !slices.Contains(supportedEvents.values, messageEvent) {
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: types.FilterEventCode, Metadata: event.Metadata, Error: "Event not supported"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.FilterEventCode,
Metadata: event.Metadata,
Error: "Event not supported",
},
)
continue
}
}

}
// allow event
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 200, Metadata: event.Metadata}
responses = append(responses, resp)
responses = append(
responses,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 200,
Metadata: event.Metadata,
},
)
}

return transformer.Response{Events: responses, FailedEvents: failedEvents}
Expand Down
Loading

0 comments on commit 6583364

Please sign in to comment.