Skip to content

Commit

Permalink
fixup! review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Oct 31, 2023
1 parent 0488d4f commit c94e405
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 30 deletions.
51 changes: 37 additions & 14 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,18 +1641,6 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
if !proc.isDestinationAvailable(singularEvent, sourceId) {
continue
}
// check if jobRunId is cancelled
if commonMetadataFromSingularEvent.SourceJobRunID != "" &&
slices.Contains(
proc.drainConfig.jobRunIDs.Load(),
commonMetadataFromSingularEvent.SourceJobRunID,
) {
proc.logger.Debugf(
"cancelled jobRunID: %s",
commonMetadataFromSingularEvent.SourceJobRunID,
)
continue
}

if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok {
groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0)
Expand Down Expand Up @@ -2397,7 +2385,24 @@ func (proc *Handle) transformSrcDest(
s := time.Now()
eventFilterInCount := len(eventsToTransform)
proc.logger.Debug("Supported messages filtering input size", eventFilterInCount)
response = ConvertToFilteredTransformerResponse(eventsToTransform, transformAt != "none")
response = ConvertToFilteredTransformerResponse(
eventsToTransform,
transformAt != "none",
func(event transformer.TransformerEvent) (bool, string) {
if event.Metadata.SourceJobRunID != "" &&
slices.Contains(
proc.drainConfig.jobRunIDs.Load(),
event.Metadata.SourceJobRunID,
) {
proc.logger.Debugf(
"cancelled jobRunID: %s",
event.Metadata.SourceJobRunID,
)
return true, "cancelled jobRunId"
}
return false, ""
},
)
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
Expand Down Expand Up @@ -2624,7 +2629,11 @@ func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
return nil
}

func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, filter bool) transformer.Response {
func ConvertToFilteredTransformerResponse(
events []transformer.TransformerEvent,
filter bool,
drainFunc func(transformer.TransformerEvent) (bool, string),
) transformer.Response {
var responses []transformer.TransformerResponse
var failedEvents []transformer.TransformerResponse

Expand All @@ -2639,6 +2648,20 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
for i := range events {
event := &events[i]

// drain events
if drain, reason := drainFunc(*event); drain {
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.DrainEventCode,
Metadata: event.Metadata,
Error: reason,
},
)
continue
}

if filter {
// filter unsupported message types
supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID]
Expand Down
33 changes: 18 additions & 15 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,6 +2101,7 @@ var _ = Describe("Processor", Ordered, func() {
})

c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), "job_run_id_1", gomock.Any(), rsources.Stats{Out: 1}).Times(1)
c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), "job_run_id_1", gomock.Any(), rsources.Stats{In: 1, Failed: 1}).Times(1)

c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) {
_ = f(jobsdb.EmptyStoreSafeTx())
Expand All @@ -2113,8 +2114,10 @@ var _ = Describe("Processor", Ordered, func() {
c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Return(nil).Times(0)

// One Store call is expected for all events
c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(0).
Do(func(ctx context.Context, jobs []*jobsdb.JobT) {})
c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).
Do(func(ctx context.Context, jobs []*jobsdb.JobT) {
Expect(jobs).To(HaveLen(1))
})

config.Set("RSources.toAbortJobRunIDs", "job_run_id_1")
defer config.Reset()
Expand Down Expand Up @@ -2406,7 +2409,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, false)
response := ConvertToFilteredTransformerResponse(events, false, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response.Events[0].StatusCode).To(Equal(expectedResponses.Events[0].StatusCode))
Expect(response.Events[0].Metadata.MessageID).To(Equal(expectedResponses.Events[0].Metadata.MessageID))
Expect(response.Events[0].Output["some-key-1"]).To(Equal(expectedResponses.Events[0].Output["some-key-1"]))
Expand Down Expand Up @@ -2658,7 +2661,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -2731,7 +2734,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -2794,7 +2797,7 @@ var _ = Describe("Static Function Tests", func() {
},
FailedEvents: nil,
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})
It("Should allow all events when supportedMessageTypes is not an array", func() {
Expand Down Expand Up @@ -2858,7 +2861,7 @@ var _ = Describe("Static Function Tests", func() {
},
FailedEvents: nil,
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -2929,7 +2932,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -2999,7 +3002,7 @@ var _ = Describe("Static Function Tests", func() {
},
FailedEvents: nil,
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3095,7 +3098,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3191,7 +3194,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3286,7 +3289,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3376,7 +3379,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3469,7 +3472,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})

Expand Down Expand Up @@ -3565,7 +3568,7 @@ var _ = Describe("Static Function Tests", func() {
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
response := ConvertToFilteredTransformerResponse(events, true, func(event transformer.TransformerEvent) (bool, string) { return false, "" })
Expect(response).To(Equal(expectedResponse))
})
})
Expand Down
3 changes: 2 additions & 1 deletion router/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
DrainReasonDestDisabled = "destination is disabled"
DrainReasonDestAbort = "destination configured to abort"
DrainReasonJobRunIDCancelled = "cancelled jobRunID"
DrainReasonJobExpired = "job expired"
)

type DestinationWithSources struct {
Expand Down Expand Up @@ -140,7 +141,7 @@ func (d *drainer) Drain(
_ = json.Unmarshal(job.Parameters, &jobParams)
destID := jobParams.DestinationID
if time.Since(createdAt) > getRetentionTimeForDestination(destID) {
return true, "job expired"
return true, DrainReasonJobExpired
}

if destination, ok := d.destinationResolver(destID); !ok {
Expand Down
1 change: 1 addition & 0 deletions utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
const (
FilterEventCode = 298
SuppressEventCode = 299
DrainEventCode = 410
)

// SingularEventT single event structrue
Expand Down

0 comments on commit c94e405

Please sign in to comment.