Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support generic rules to have routers drain events #3856

Merged
merged 11 commits into from
Nov 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func TestReportingDroppedEvents(t *testing.T) {
gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)
wg.Go(func() error {
config.Set("BatchRouter.toAbortDestinationIDs", "destination-1")
config.Set("Router.toAbortDestinationIDs", "destination-1")
err := runRudderServer(ctx, gwPort, postgresContainer, bcserver.URL, trServer.URL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func TestReportingErrorIndex(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
config.Set("BatchRouter.toAbortDestinationIDs", "destination-1")
config.Set("Router.toAbortDestinationIDs", "destination-1")

err := runRudderServer(ctx, gwPort, postgresContainer, minioResource, bcServer.URL, trServer.URL, t.TempDir())
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,28 @@ func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) {
}

// New creates a new Processor instance
func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider,
rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger,
func New(
ctx context.Context,
clearDb *bool,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)),
Handle: NewHandle(
config.Default,
transformer.NewTransformer(
config.Default,
logger.NewLogger().Child("processor"),
stats.Default,
),
),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand Down
98 changes: 82 additions & 16 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@

var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary

func NewHandle(transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer}
func NewHandle(c *config.Config, transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer, conf: c}
h.loadConfig()
return h
}

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
lastJobID int64
Expand Down Expand Up @@ -145,6 +146,10 @@
eventAuditEnabled map[string]bool
}

drainConfig struct {
jobRunIDs misc.ValueLoader[[]string]
}

adaptiveLimit func(int64) int64
storePlocker kitsync.PartitionLocker
}
Expand Down Expand Up @@ -351,8 +356,11 @@

// Setup initializes the module
func (proc *Handle) Setup(
backendConfig backendconfig.BackendConfig, gatewayDB, routerDB,
batchRouterDB, readErrorDB, writeErrorDB, eventSchemaDB, archivalDB jobsdb.JobsDB, reporting types.Reporting,
backendConfig backendconfig.BackendConfig,
gatewayDB, routerDB, batchRouterDB,
readErrorDB, writeErrorDB,
eventSchemaDB, archivalDB jobsdb.JobsDB,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
Expand All @@ -364,6 +372,9 @@
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled")
if proc.conf == nil {
proc.conf = config.Default
}

Check warning on line 377 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L376-L377

Added lines #L376 - L377 were not covered by tests
proc.setupReloadableVars()
proc.logger = logger.NewLogger().Child("processor")
proc.backendConfig = backendConfig
Expand Down Expand Up @@ -483,9 +494,10 @@
}

func (proc *Handle) setupReloadableVars() {
proc.jobdDBQueryRequestTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = config.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.jobdDBQueryRequestTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = proc.conf.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.drainConfig.jobRunIDs = proc.conf.GetReloadableStringSliceVar([]string{}, "RSources.toAbortJobRunIDs")
}

// Start starts this processor's main loops.
Expand Down Expand Up @@ -2328,7 +2340,24 @@
s := time.Now()
eventFilterInCount := len(eventsToTransform)
proc.logger.Debug("Supported messages filtering input size", eventFilterInCount)
response = ConvertToFilteredTransformerResponse(eventsToTransform, transformAt != "none")
response = ConvertToFilteredTransformerResponse(
eventsToTransform,
transformAt != "none",
func(event transformer.TransformerEvent) (bool, string) {
if event.Metadata.SourceJobRunID != "" &&
slices.Contains(
proc.drainConfig.jobRunIDs.Load(),
event.Metadata.SourceJobRunID,
) {
proc.logger.Debugf(
"cancelled jobRunID: %s",
event.Metadata.SourceJobRunID,
)
return true, "cancelled jobRunId"
}
return false, ""
},
)
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
Expand Down Expand Up @@ -2544,7 +2573,11 @@
return nil
}

func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, filter bool) transformer.Response {
func ConvertToFilteredTransformerResponse(
events []transformer.TransformerEvent,
filter bool,
drainFunc func(transformer.TransformerEvent) (bool, string),
) transformer.Response {
var responses []transformer.TransformerResponse
var failedEvents []transformer.TransformerResponse

Expand All @@ -2556,10 +2589,23 @@
supportedMessageEventsCache := make(map[string]*cacheValue)

// filter unsupported message types
var resp transformer.TransformerResponse
for i := range events {
event := &events[i]

// drain events
if drain, reason := drainFunc(*event); drain {
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.DrainEventCode,
Metadata: event.Metadata,
Error: reason,
},
)
continue
}

if filter {
// filter unsupported message types
supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID]
Expand Down Expand Up @@ -2589,21 +2635,41 @@
messageEvent, typOk := event.Message["event"].(string)
if !typOk {
// add to FailedEvents
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: "Invalid message event. Type assertion failed"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 400,
Metadata: event.Metadata,
Error: "Invalid message event. Type assertion failed",
},
)
continue
}
if !slices.Contains(supportedEvents.values, messageEvent) {
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: types.FilterEventCode, Metadata: event.Metadata, Error: "Event not supported"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.FilterEventCode,
Metadata: event.Metadata,
Error: "Event not supported",
},
)
continue
}
}

}
// allow event
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 200, Metadata: event.Metadata}
responses = append(responses, resp)
responses = append(
responses,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 200,
Metadata: event.Metadata,
},
)
}

return transformer.Response{Events: responses, FailedEvents: failedEvents}
Expand Down
Loading
Loading