Skip to content

Commit

Permalink
fixup! chore: update processor logic to accomodate jobRunID aborts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Oct 11, 2023
1 parent 109d7e2 commit 1ecff8a
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 119 deletions.
38 changes: 32 additions & 6 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ func (proc *LifecycleManager) Start() error {
}

proc.Handle.Setup(
proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.readErrDB, proc.writeErrDB, proc.esDB, proc.arcDB,
proc.ReportingI, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger,
proc.BackendConfig,
proc.gatewayDB,
proc.routerDB,
proc.batchRouterDB,
proc.readErrDB,
proc.writeErrDB,
proc.esDB,
proc.arcDB,
proc.ReportingI,
proc.transientSources,
proc.fileuploader,
proc.rsourcesService,
proc.destDebugger,
proc.transDebugger,
)

currentCtx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -91,13 +103,27 @@ func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) {
}

// New creates a new Processor instance
func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider,
rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger,
func New(
ctx context.Context,
clearDb *bool,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)),
Handle: NewHandle(
config.Default,
transformer.NewTransformer(
config.Default,
logger.NewLogger().Child("processor"),
stats.Default,
),
),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand Down
61 changes: 36 additions & 25 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ const (

var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary

func NewHandle(transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer}
func NewHandle(c *config.Config, transformer transformer.Transformer) *Handle {
h := &Handle{transformer: transformer, conf: c}
h.loadConfig()
return h
}

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
lastJobID int64
Expand Down Expand Up @@ -142,6 +143,10 @@ type Handle struct {
eventAuditEnabled map[string]bool
}

drainConfig struct {
jobRunIDs misc.ValueLoader[[]string]
}

adaptiveLimit func(int64) int64
storePlocker kitsync.PartitionLocker
}
Expand Down Expand Up @@ -348,15 +353,24 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination

// Setup initializes the module
func (proc *Handle) Setup(
backendConfig backendconfig.BackendConfig, gatewayDB, routerDB,
batchRouterDB, readErrorDB, writeErrorDB, eventSchemaDB, archivalDB jobsdb.JobsDB, reporting types.Reporting,
backendConfig backendconfig.BackendConfig,
gatewayDB, routerDB, batchRouterDB,
readErrorDB, writeErrorDB,
eventSchemaDB, archivalDB jobsdb.JobsDB,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
) {
proc.reporting = reporting
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled")
if proc.conf == nil {
proc.conf = config.Default
}

Check warning on line 373 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L372-L373

Added lines #L372 - L373 were not covered by tests
proc.setupReloadableVars()
proc.logger = logger.NewLogger().Child("processor")
proc.backendConfig = backendConfig
Expand Down Expand Up @@ -475,9 +489,10 @@ func (proc *Handle) Setup(
}

func (proc *Handle) setupReloadableVars() {
proc.jobdDBQueryRequestTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = config.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.jobdDBQueryRequestTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout")
proc.jobsDBCommandTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout")
proc.jobdDBMaxRetries = proc.conf.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries")
proc.drainConfig.jobRunIDs = proc.conf.GetReloadableStringSliceVar([]string{}, "RSources.toAbortJobRunIDs")
}

// Start starts this processor's main loops.
Expand Down Expand Up @@ -1595,6 +1610,19 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
if !proc.isDestinationAvailable(singularEvent, sourceId) {
continue
}
// check if jobRunId is cancelled
if len(proc.drainConfig.jobRunIDs.Load()) > 0 {
if slices.Contains(
proc.drainConfig.jobRunIDs.Load(),
commonMetadataFromSingularEvent.SourceJobRunID,
) {
proc.logger.Debugf(
"cancelled jobRunID: %s",
commonMetadataFromSingularEvent.SourceJobRunID,
)
continue
}
}

if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok {
groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0)
Expand Down Expand Up @@ -2582,27 +2610,10 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
supportedMessageTypesCache := make(map[string]*cacheValue)
supportedMessageEventsCache := make(map[string]*cacheValue)

toAbortJobRunIDs := config.GetStringVar("", "RSources.toAbortJobRunIDs")

// filter unsupported message types
for i := range events {
event := &events[i]

if toAbortJobRunIDs != "" {
abortIDs := strings.Split(toAbortJobRunIDs, ",")
if slices.Contains(abortIDs, event.Metadata.SourceJobRunID) {
failedEvents = append(failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 400,
Metadata: event.Metadata,
Error: "jobRunID configured to abort",
},
)
continue
}
}

if filter {
// filter unsupported message types
supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID]
Expand Down
Loading

0 comments on commit 1ecff8a

Please sign in to comment.