Skip to content

Commit

Permalink
chore: send filtered events in ut as dropped (#3972)
Browse files Browse the repository at this point in the history
  • Loading branch information
chandumlg committed Oct 13, 2023
1 parent 5a74b34 commit b7590d2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
2 changes: 1 addition & 1 deletion jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
*/
type JobStatusT struct {
JobID int64 `json:"JobID"`
JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate
JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, filtered, failed, aborted, migrating, migrated, wont_migrate
AttemptNum int `json:"AttemptNum"`
ExecTime time.Time `json:"ExecTime"`
RetryTime time.Time `json:"RetryTime"`
Expand Down
4 changes: 4 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,10 @@ func (proc *Handle) transformSrcDest(
var successCountMetadataMap map[string]MetricMetadata
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

Expand Down
12 changes: 10 additions & 2 deletions services/debugger/transformation/transformationStatusUploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,26 @@ func (h *Handle) processRecordTransformationStatus(tStatus *TransformationStatus
singularEventWithReceivedAt := tStatus.EventsByMessageID[msgID]
eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt)
eventAfter := &EventsAfterTransform{
Error: failedEvent.Error,
ReceivedAt: time.Now().Format(misc.RFC3339Milli),
StatusCode: failedEvent.StatusCode,
}
var isError bool
switch failedEvent.StatusCode {
case types.FilterEventCode:
eventAfter.IsDropped = true
isError = false
default:
eventAfter.Error = failedEvent.Error
isError = true
}

h.RecordTransformationStatus(&TransformStatusT{
TransformationID: tID,
SourceID: tStatus.SourceID,
DestinationID: tStatus.DestID,
EventBefore: eventBefore,
EventsAfter: eventAfter,
IsError: true,
IsError: isError,
})
}
}
Expand Down

0 comments on commit b7590d2

Please sign in to comment.