Skip to content

Commit

Permalink
chore: support generic rules to have routers drain events
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Sep 15, 2023
1 parent 12f4d83 commit 1b80237
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 49 deletions.
7 changes: 4 additions & 3 deletions router/batchrouter/batchrouterBenchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
)

func Benchmark_GetStorageDateFormat(b *testing.B) {
Expand Down Expand Up @@ -47,7 +48,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) {
for i := 0; i < b.N; i++ {
var jobs []*jobsdb.JobT
for i := 0; i < 100; i++ {
params := JobParameters{
params := routerutils.JobParameters{
EventName: "test",
EventType: "track",
MessageID: uuid.New().String(),
Expand All @@ -64,7 +65,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) {

g := errgroup.Group{}
g.Go(func() error {
params := JobParameters{
params := routerutils.JobParameters{
EventName: "test",
EventType: "track",
MessageID: uuid.New().String(),
Expand All @@ -84,7 +85,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) {
})
g.Go(func() error {
for i := range jobs {
var params JobParameters
var params routerutils.JobParameters
_ = json.Unmarshal(jobs[i].Parameters, &params)
}

Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (brt *Handle) pingWarehouse(batchJobs *BatchedJobs, output UploadResult) (e
}
}
}
var sampleParameters JobParameters
var sampleParameters router_utils.JobParameters
err = json.Unmarshal(batchJobs.Jobs[0].Parameters, &sampleParameters)
if err != nil {
brt.logger.Error("Unmarshal of job parameters failed in postToWarehouse function. ", string(batchJobs.Jobs[0].Parameters))
Expand Down Expand Up @@ -610,7 +610,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
errorResp = []byte(errorRespString)
}

var parameters JobParameters
var parameters router_utils.JobParameters
err = json.Unmarshal(job.Parameters, &parameters)
if err != nil {
brt.logger.Error("Unmarshal of job parameters failed. ", string(job.Parameters))
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM
statusDetailsMap := make(map[string]*utilTypes.StatusDetail)
routerWorkspaceJobStatusCount := make(map[string]int)
for _, status := range statusList {
var parameters JobParameters
var parameters routerutils.JobParameters

Check warning on line 512 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L512

Added line #L512 was not covered by tests
err := json.Unmarshal(parametersMap[status.JobID], &parameters)
if err != nil {
brt.logger.Error("Unmarshal of job parameters failed. ", string(parametersMap[status.JobID]))
Expand Down
16 changes: 0 additions & 16 deletions router/batchrouter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,6 @@ type Connection struct {
Destination backendconfig.DestinationT
}

type JobParameters struct {
SourceID string `json:"source_id"`
DestinationID string `json:"destination_id"`
ReceivedAt string `json:"received_at"`
TransformAt string `json:"transform_at"`
SourceTaskRunID string `json:"source_task_run_id"`
SourceJobID string `json:"source_job_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceDefinitionID string `json:"source_definition_id"`
DestinationDefinitionID string `json:"destination_definition_id"`
SourceCategory string `json:"source_category"`
EventName string `json:"event_name"`
EventType string `json:"event_type"`
MessageID string `json:"message_id"`
}

type DestinationJobs struct {
destWithSources router_utils.DestinationWithSources
jobs []*jobsdb.JobT
Expand Down
7 changes: 4 additions & 3 deletions router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -75,7 +74,9 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin

jobsBySource := make(map[string][]*jobsdb.JobT)
for _, job := range destinationJobs.jobs {
if drain, reason := router_utils.ToBeDrained(job, destWithSources.Destination.ID, brt.toAbortDestinationIDs.Load(), destinationsMap); drain {
var params router_utils.JobParameters
_ = json.Unmarshal(job.Parameters, &params)
if drain, reason := router_utils.ToBeDrained(job, params, router_utils.AbortConfigs{ToAbortDestinationIDs: brt.toAbortDestinationIDs.Load()}, destinationsMap); drain {
status := jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
Expand Down Expand Up @@ -105,7 +106,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
drainStatsbyDest[destWithSources.Destination.ID].Reasons = append(drainStatsbyDest[destWithSources.Destination.ID].Reasons, reason)
}
} else {
sourceID := gjson.GetBytes(job.Parameters, "source_id").String()
sourceID := params.SourceID
if _, ok := jobsBySource[sourceID]; !ok {
jobsBySource[sourceID] = []*jobsdb.JobT{}
}
Expand Down
6 changes: 3 additions & 3 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
var statusList []*jobsdb.JobStatusT
var routerAbortedJobs []*jobsdb.JobT
for _, workerJobStatus := range *workerJobStatuses {
var parameters JobParameters
var parameters routerutils.JobParameters
err := json.Unmarshal(workerJobStatus.job.Parameters, &parameters)
if err != nil {
rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters))
Expand Down Expand Up @@ -476,7 +476,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
return nil, types.ErrContextCancelled
}

var parameters JobParameters
var parameters routerutils.JobParameters
if err := json.Unmarshal(job.Parameters, &parameters); err != nil {
rt.logger.Errorf(`[%v Router] :: Unmarshalling parameters failed with the error %v . Returning nil worker`, err)
return nil, types.ErrParamsUnmarshal
Expand Down Expand Up @@ -547,7 +547,7 @@ func (*Handle) shouldBackoff(job *jobsdb.JobT) bool {
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
}

func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters JobParameters) (limited bool) {
func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) {
if rt.throttlerFactory == nil {
// throttlerFactory could be nil when throttling is disabled or misconfigured.
// in case of misconfiguration, logging errors are emitted.
Expand Down
18 changes: 0 additions & 18 deletions router/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,6 @@ import (
"github.com/rudderlabs/rudder-server/router/types"
)

// JobParameters struct holds source id and destination id of a job
type JobParameters struct {
SourceID string `json:"source_id"`
DestinationID string `json:"destination_id"`
ReceivedAt string `json:"received_at"`
TransformAt string `json:"transform_at"`
SourceTaskRunID string `json:"source_task_run_id"`
SourceJobID string `json:"source_job_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceDefinitionID string `json:"source_definition_id"`
DestinationDefinitionID string `json:"destination_definition_id"`
SourceCategory string `json:"source_category"`
RecordID interface{} `json:"record_id"`
MessageID string `json:"message_id"`
WorkspaceID string `json:"workspaceId"`
RudderAccountID string `json:"rudderAccountId"`
}

type workerJobStatus struct {
userID string
worker *worker
Expand Down
36 changes: 35 additions & 1 deletion router/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,37 @@ func getRetentionTimeForDestination(destID string) time.Duration {
return config.GetDurationVar(720, time.Hour, "Router."+destID+".jobRetention", "Router.jobRetention")
}

func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destinationsMap map[string]*DestinationWithSources) (bool, string) {
type JobParameters struct {
SourceID string `json:"source_id"`
DestinationID string `json:"destination_id"`
ReceivedAt string `json:"received_at"`
TransformAt string `json:"transform_at"`
SourceTaskRunID string `json:"source_task_run_id"`
SourceJobID string `json:"source_job_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceDefinitionID string `json:"source_definition_id"`
DestinationDefinitionID string `json:"destination_definition_id"`
SourceCategory string `json:"source_category"`
RecordID interface{} `json:"record_id"`
MessageID string `json:"message_id"`
EventName string `json:"event_name"`
EventType string `json:"event_type"`
WorkspaceID string `json:"workspaceId"`
RudderAccountID string `json:"rudderAccountId"`
}

type AbortConfigs struct {
ToAbortDestinationIDs string
// source
// connection
// jobRunID
// ...
}

func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortConfigs, destinationsMap map[string]*DestinationWithSources) (bool, string) {
// drain if job is older than the destination's retention time
createdAt := job.CreatedAt
destID := jobParams.DestinationID
if time.Since(createdAt) > getRetentionTimeForDestination(destID) {
return true, "job expired"
}
Expand All @@ -58,13 +86,19 @@ func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destina
return true, "destination is disabled"
}

toAbortDestinationIDs := abortConfig.ToAbortDestinationIDs
if toAbortDestinationIDs != "" {
abortIDs := strings.Split(toAbortDestinationIDs, ",")
if slices.Contains(abortIDs, destID) {
return true, "destination configured to abort"
}
}

// toAbortSourceIDS
// toAbortConnection(sourceID, destID)
// toAbortJobRunID/TaskRunID
// ...

return false, ""
}

Expand Down
4 changes: 2 additions & 2 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func (w *worker) workLoop() {
job := message.job
userID := job.UserID

var parameters JobParameters
var parameters routerutils.JobParameters
if err := json.Unmarshal(job.Parameters, &parameters); err != nil {
panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err))
}
w.rt.destinationsMapMu.RLock()
abort, abortReason := routerutils.ToBeDrained(job, parameters.DestinationID, w.rt.reloadableConfig.toAbortDestinationIDs.Load(), w.rt.destinationsMap)
abort, abortReason := routerutils.ToBeDrained(job, parameters, routerutils.AbortConfigs{ToAbortDestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load()}, w.rt.destinationsMap)
abortTag := abortReason
w.rt.destinationsMapMu.RUnlock()
if !abort {
Expand Down

0 comments on commit 1b80237

Please sign in to comment.