Skip to content

Commit

Permalink
fix: limit entire transformationStatus struct when caching them (#2928)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 6, 2023
1 parent 2625f1a commit 2283aef
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandle(backendConfig backendconfig.BackendConfig, opts ...Opt) (Destinat
log: logger.NewLogger().Child("debugger").Child("destination"),
}
var err error
url := fmt.Sprintf("%s/dataplane/v2/eventUploads", h.configBackendURL)
url := fmt.Sprintf("%s/dataplane/v2/eventDeliveryStatus", h.configBackendURL)
eventUploader := NewEventDeliveryStatusUploader(h.log)
h.uploader = debugger.New[*DeliveryStatusT](url, eventUploader)
h.uploader.Start()
Expand Down
45 changes: 39 additions & 6 deletions services/debugger/transformation/transformationStatusUploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,41 @@ func (h *Handle) backendConfigSubscriber(backendConfig backendconfig.BackendConf
close(h.done)
}

// limit the number of stored events
func (ts *TransformationStatusT) Limit(
limit int,
transformation backendconfig.TransformationT,
) *TransformationStatusT {
ts.Destination.Transformations = []backendconfig.TransformationT{transformation}
ts.UserTransformedEvents = lo.Slice(ts.UserTransformedEvents, 0, limit)
ts.FailedEvents = lo.Slice(ts.FailedEvents, 0, limit)
messageIDs := lo.SliceToMap(
append(
lo.Map(
ts.UserTransformedEvents,
func(event transformer.TransformerEventT, _ int) string {
return event.Metadata.MessageID
},
),
lo.Map(
ts.FailedEvents,
func(event transformer.TransformerResponseT, _ int) string {
return event.Metadata.MessageID
},
)...,
),
func(messageID string) (string, struct{}) {
return messageID, struct{}{}
},
)
ts.UniqueMessageIds = messageIDs
ts.EventsByMessageID = lo.PickByKeys(
ts.EventsByMessageID,
lo.Keys(messageIDs),
)
return ts
}

func (h *Handle) UploadTransformationStatus(tStatus *TransformationStatusT) bool {
defer func() {
if r := recover(); r != nil {
Expand All @@ -242,12 +277,10 @@ func (h *Handle) UploadTransformationStatus(tStatus *TransformationStatusT) bool
if h.IsUploadEnabled(transformation.ID) {
h.processRecordTransformationStatus(tStatus, transformation.ID)
} else {
tStatusUpdated := *tStatus
lo.Slice(tStatusUpdated.UserTransformedEvents, 0, h.limitEventsInMemory+1)
tStatusUpdated.Destination.Transformations = []backendconfig.TransformationT{transformation}
tStatusUpdated.UserTransformedEvents = lo.Slice(tStatusUpdated.UserTransformedEvents, 0, h.limitEventsInMemory+1)
tStatusUpdated.FailedEvents = lo.Slice(tStatusUpdated.FailedEvents, 0, h.limitEventsInMemory+1)
err := h.transformationCacheMap.Update(transformation.ID, tStatusUpdated)
err := h.transformationCacheMap.Update(
transformation.ID,
*(tStatus.Limit(h.limitEventsInMemory+1, transformation)),
)
if err != nil {
h.log.Errorf("Error while updating transformation cache: %v", err)
return false
Expand Down
127 changes: 127 additions & 0 deletions services/debugger/transformation/transformationStatusUploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package transformationdebugger_test
import (
"context"
"path"
"testing"
"time"

"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/testhelper/rand"
"github.com/stretchr/testify/require"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -314,3 +317,127 @@ var _ = Describe("eventDeliveryStatusUploader", func() {
})
})
})

func TestLimit(t *testing.T) {
var (
singularEvent1 = types.SingularEventT{"payload": "event-1"}
metadata1 = transformer.MetadataT{MessageID: "message-id-1"}
singularEvent2 = types.SingularEventT{"payload": "event-2"}
metadata2 = transformer.MetadataT{MessageID: "message-id-2"}
singularEvent3 = types.SingularEventT{"payload": "event-3"}
metadata3 = transformer.MetadataT{MessageID: "message-id-3"}
singularEvent4 = types.SingularEventT{"payload": "event-4"}
metadata4 = transformer.MetadataT{MessageID: "message-id-4"}
now = time.Now()
limit = 1
)
t.Run("should limit the received transformation status", func(t *testing.T) {
tStatus := &transformationdebugger.TransformationStatusT{
Destination: &sampleBackendConfig.Sources[1].Destinations[1],
DestID: sampleBackendConfig.Sources[1].Destinations[1].ID,
SourceID: sampleBackendConfig.Sources[1].ID,
UserTransformedEvents: []transformer.TransformerEventT{
{
Message: singularEvent1,
Metadata: metadata1,
},
{
Message: singularEvent2,
Metadata: metadata2,
},
},
EventsByMessageID: map[string]types.SingularEventWithReceivedAt{
metadata1.MessageID: {
SingularEvent: singularEvent1,
ReceivedAt: now,
},
metadata2.MessageID: {
SingularEvent: singularEvent2,
ReceivedAt: now,
},
metadata3.MessageID: {
SingularEvent: singularEvent3,
ReceivedAt: now,
},
metadata4.MessageID: {
SingularEvent: singularEvent4,
ReceivedAt: now,
},
},
FailedEvents: []transformer.TransformerResponseT{
{
Output: singularEvent1,
Metadata: metadata3,
Error: "some_error1",
},
{
Output: singularEvent2,
Metadata: metadata4,
Error: "some_error2",
},
},
UniqueMessageIds: map[string]struct{}{
metadata1.MessageID: {},
metadata2.MessageID: {},
metadata3.MessageID: {},
metadata4.MessageID: {},
},
}
limitedTStatus := tStatus.Limit(
1,
sampleBackendConfig.Sources[1].Destinations[1].Transformations[0],
)
require.Equal(t, limit, len(limitedTStatus.UserTransformedEvents))
require.Equal(t, limit, len(limitedTStatus.FailedEvents))
require.Equal(t, limit*2, len(limitedTStatus.UniqueMessageIds))
require.Equal(t, limit*2, len(limitedTStatus.EventsByMessageID))
require.Equal(
t,
[]backendconfig.TransformationT{sampleBackendConfig.Sources[1].Destinations[1].Transformations[0]},
limitedTStatus.Destination.Transformations,
)
require.Equal(
t,
[]transformer.TransformerEventT{
{
Message: singularEvent1,
Metadata: metadata1,
},
},
limitedTStatus.UserTransformedEvents,
)
require.Equal(
t,
[]transformer.TransformerResponseT{
{
Output: singularEvent1,
Metadata: metadata3,
Error: "some_error1",
},
},
limitedTStatus.FailedEvents,
)
require.Equal(
t,
map[string]struct{}{
metadata1.MessageID: {},
metadata3.MessageID: {},
},
limitedTStatus.UniqueMessageIds,
)
require.Equal(
t,
map[string]types.SingularEventWithReceivedAt{
metadata1.MessageID: {
SingularEvent: singularEvent1,
ReceivedAt: now,
},
metadata3.MessageID: {
SingularEvent: singularEvent3,
ReceivedAt: now,
},
},
limitedTStatus.EventsByMessageID,
)
})
}

0 comments on commit 2283aef

Please sign in to comment.