Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: user transformations timeout #3583

Merged
merged 5 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func TestDynamicClusterManager(t *testing.T) {
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1)

rtFactory := &router.Factory{
Logger: logger.NOP,
Expand Down
5 changes: 3 additions & 2 deletions enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"sort"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -147,8 +149,7 @@
uploader: handle.uploader,
}
handle.workers[i] = worker
worker.transformer = transformer.NewTransformer()
worker.transformer.Setup()
worker.transformer = transformer.NewTransformer(config.Default, handle.log, stats.Default)

Check warning on line 152 in enterprise/replay/replay.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/replay.go#L152

Added line #L152 was not covered by tests
go worker.workerProcess(ctx)
}
handle.initSourceWorkersChannel <- true
Expand Down
9 changes: 4 additions & 5 deletions enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"github.com/rudderlabs/rudder-go-kit/filemanager"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/transformer"
)

Expand Down Expand Up @@ -113,7 +112,7 @@

var jobs []*jobsdb.JobT

var transEvents []transformer.TransformerEventT
var transEvents []transformer.TransformerEvent

Check warning on line 115 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L115

Added line #L115 was not covered by tests
transformationVersionID := config.GetString("TRANSFORMATION_VERSION_ID", "")

for sc.Scan() {
Expand Down Expand Up @@ -151,14 +150,14 @@

messageID := uuid.New().String()

metadata := transformer.MetadataT{
metadata := transformer.Metadata{

Check warning on line 153 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L153

Added line #L153 was not covered by tests
MessageID: messageID,
DestinationID: gjson.GetBytes(copyLineBytes, "parameters.destination_id").String(),
}

transformation := backendconfig.TransformationT{VersionID: config.GetString("TRANSFORMATION_VERSION_ID", "")}

transEvent := transformer.TransformerEventT{
transEvent := transformer.TransformerEvent{

Check warning on line 160 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L160

Added line #L160 was not covered by tests
Message: message,
Metadata: metadata,
Destination: backendconfig.DestinationT{Transformations: []backendconfig.TransformationT{transformation}},
Expand All @@ -168,7 +167,7 @@
}

if transformationVersionID != "" {
response := worker.transformer.Transform(context.TODO(), transEvents, integrations.GetUserTransformURL(), userTransformBatchSize)
response := worker.transformer.UserTransform(context.TODO(), transEvents, userTransformBatchSize)

Check warning on line 170 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L170

Added line #L170 was not covered by tests

for _, ev := range response.Events {
destEventJSON, err := json.Marshal(ev.Output[worker.getFieldIdentifier(eventPayload)])
Expand Down
32 changes: 17 additions & 15 deletions mocks/processor/transformer/mock_transformer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions processor/eventfilter/eventfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetSupportedMessageEvents(destination *backendconfig.DestinationT) ([]strin
}

type AllowTransformerEventParams struct {
TransformerEvent *transformer.TransformerEventT
TransformerEvent *transformer.TransformerEvent
SupportedMessageTypes []string
}

Expand Down Expand Up @@ -123,13 +123,13 @@ Currently this method supports below validations(executed in the same order):

2. Validate if the event is sendable to destination based on connectionMode, sourceType & messageType
*/
func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEventT, supportedMsgTypes []string) (bool, *transformer.TransformerResponseT) {
func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEvent, supportedMsgTypes []string) (bool, *transformer.TransformerResponse) {
// MessageType filtering -- STARTS
messageType := strings.TrimSpace(strings.ToLower(getMessageType(&transformerEvent.Message)))
if messageType == "" {
// We will abort the event
errMessage := "Invalid message type. Type assertion failed"
resp := &transformer.TransformerResponseT{
resp := &transformer.TransformerResponse{
Output: transformerEvent.Message, StatusCode: 400,
Metadata: transformerEvent.Metadata,
Error: errMessage,
Expand Down Expand Up @@ -193,7 +193,7 @@ Example:
...
}
*/
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponseT) {
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponse) {
destination := connectionModeFilterParams.Destination
srcType := strings.TrimSpace(connectionModeFilterParams.SrcType)
messageType := connectionModeFilterParams.Event.MessageType
Expand Down
62 changes: 5 additions & 57 deletions processor/integrations/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,26 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
destTransformURL string
userTransformURL string
json = jsoniter.ConfigCompatibleWithStandardLibrary
postParametersTFields []string
)

func Init() {
loadConfig()
func init() {
// This is called in init and it should be a one time call. Making reflect calls during runtime is not a great idea.
// We unmarshal json response from transformer into PostParametersT struct.
// Since unmarshal doesn't check if the fields are present in the json or not and instead just initialze to zero value, we have to manually do this check on all fields before unmarshaling
// This function gets a list of fields tagged as json from the struct and populates in postParametersTFields
postParametersTFields = misc.GetMandatoryJSONFieldNames(PostParametersT{})
}

func loadConfig() {
destTransformURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
userTransformURL = config.GetString("USER_TRANSFORM_URL", destTransformURL)
}

const (
// PostDataKV means post data is sent as KV
PostDataKV = iota + 1
// PostDataJSON means post data is sent as JSON
PostDataJSON
// PostDataXML means post data is sent as XML
PostDataXML
)

// PostParametersT is a struct for holding all the values from transformerResponse and use them to publish an event to a destination
// optional is a custom tag introduced by us and is handled by GetMandatoryJSONFieldNames. Its intentionally added
// after two commas because the tag that comes after the first comma should be known by json parser
Expand All @@ -73,7 +53,7 @@ type TransResponseT struct {

func CollectDestErrorStats(input []byte) {
var integrationStat TransStatsT
err := jsonfast.Unmarshal(input, &integrationStat)
err := json.Unmarshal(input, &integrationStat)
if err == nil {
if len(integrationStat.StatTags) > 0 {
stats.Default.NewTaggedStat("integration.failure_detailed", stats.CountType, integrationStat.StatTags).Increment()
Expand All @@ -83,7 +63,7 @@ func CollectDestErrorStats(input []byte) {

func CollectIntgTransformErrorStats(input []byte) {
var integrationStats []TransStatsT
err := jsonfast.Unmarshal(input, &integrationStats)
err := json.Unmarshal(input, &integrationStats)
if err == nil {
for _, integrationStat := range integrationStats {
if len(integrationStat.StatTags) > 0 {
Expand All @@ -95,7 +75,7 @@ func CollectIntgTransformErrorStats(input []byte) {

// GetPostInfo parses the transformer response
func ValidatePostInfo(transformRawParams PostParametersT) error {
transformRaw, err := jsonfast.Marshal(transformRawParams)
transformRaw, err := json.Marshal(transformRawParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,35 +140,3 @@ func FilterClientIntegrations(clientEvent types.SingularEventT, destNameIDMap ma
retVal = outVal
return
}

// GetTransformerURL gets the transfomer base url endpoint
func GetTransformerURL() string {
return destTransformURL
}

// GetDestinationURL returns node URL
func GetDestinationURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", destTransformURL, strings.ToLower(destType))
if slices.Contains(warehouseutils.WarehouseDestinations, destType) {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", config.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
if destType == "RS" {
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
if destType == "CLICKHOUSE" {
enableArraySupport := fmt.Sprintf("chEnableArraySupport=%s", fmt.Sprintf("%v", config.GetBool("Warehouse.clickhouse.enableArraySupport", false)))
return destinationEndPoint + "?" + whSchemaVersionQueryParam + "&" + enableArraySupport
}
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
return destinationEndPoint
}

// GetUserTransformURL returns the port of running user transform
func GetUserTransformURL() string {
return userTransformURL + "/customTransform"
}

// GetTrackingPlanValidationURL returns the port of running tracking plan validation
func GetTrackingPlanValidationURL() string {
return destTransformURL + "/v0/validate"
}
fracasula marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"sync"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -87,7 +91,7 @@ func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb, esDB *job
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(transformer.NewTransformer()),
Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand Down
8 changes: 2 additions & 6 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ func TestProcessorManager(t *testing.T) {
},
)
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
processor.BackendConfig = mockBackendConfig
processor.Handle.transformer = mockTransformer
Expand Down Expand Up @@ -274,9 +272,7 @@ func TestProcessorManager(t *testing.T) {
)

mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)

require.NoError(t, processor.Start())
Expand Down
Loading
Loading