Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: send filtered events in ut as dropped #3972

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
*/
type JobStatusT struct {
JobID int64 `json:"JobID"`
JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate
JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, filtered, failed, aborted, migrating, migrated, wont_migrate
AttemptNum int `json:"AttemptNum"`
ExecTime time.Time `json:"ExecTime"`
RetryTime time.Time `json:"RetryTime"`
Expand Down
4 changes: 4 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,10 @@ func (proc *Handle) transformSrcDest(
var successCountMetadataMap map[string]MetricMetadata
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

Expand Down
12 changes: 10 additions & 2 deletions services/debugger/transformation/transformationStatusUploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,26 @@
singularEventWithReceivedAt := tStatus.EventsByMessageID[msgID]
eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt)
eventAfter := &EventsAfterTransform{
Error: failedEvent.Error,
ReceivedAt: time.Now().Format(misc.RFC3339Milli),
StatusCode: failedEvent.StatusCode,
}
var isError bool
switch failedEvent.StatusCode {
case types.FilterEventCode:
eventAfter.IsDropped = true
isError = false
default:
eventAfter.Error = failedEvent.Error
isError = true

Check warning on line 390 in services/debugger/transformation/transformationStatusUploader.go

View check run for this annotation

Codecov / codecov/patch

services/debugger/transformation/transformationStatusUploader.go#L383-L390

Added lines #L383 - L390 were not covered by tests
}

h.RecordTransformationStatus(&TransformStatusT{
TransformationID: tID,
SourceID: tStatus.SourceID,
DestinationID: tStatus.DestID,
EventBefore: eventBefore,
EventsAfter: eventAfter,
IsError: true,
IsError: isError,

Check warning on line 399 in services/debugger/transformation/transformationStatusUploader.go

View check run for this annotation

Codecov / codecov/patch

services/debugger/transformation/transformationStatusUploader.go#L399

Added line #L399 was not covered by tests
})
}
}
Expand Down
Loading