Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: user transformations timeout #3583

Merged
merged 5 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestDynamicClusterManager(t *testing.T) {
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1)
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)

rtFactory := &router.Factory{
Logger: logger.NOP,
Expand Down
4 changes: 3 additions & 1 deletion enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sort"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -148,7 +150,7 @@ func (handle *Handler) initSourceWorkers(ctx context.Context) {
}
handle.workers[i] = worker
worker.transformer = transformer.NewTransformer()
worker.transformer.Setup()
worker.transformer.Setup(config.Default, handle.log, stats.Default)
go worker.workerProcess(ctx)
}
handle.initSourceWorkersChannel <- true
Expand Down
3 changes: 1 addition & 2 deletions enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/transformer"
)

Expand Down Expand Up @@ -168,7 +167,7 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri
}

if transformationVersionID != "" {
response := worker.transformer.Transform(context.TODO(), transEvents, integrations.GetUserTransformURL(), userTransformBatchSize)
response := worker.transformer.UserTransform(context.TODO(), transEvents, userTransformBatchSize)

for _, ev := range response.Events {
destEventJSON, err := json.Marshal(ev.Output[worker.getFieldIdentifier(eventPayload)])
Expand Down
35 changes: 26 additions & 9 deletions mocks/processor/transformer/mock_transformer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 5 additions & 57 deletions processor/integrations/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,26 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
destTransformURL string
userTransformURL string
jsonFast = jsoniter.ConfigCompatibleWithStandardLibrary
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
postParametersTFields []string
)

func Init() {
loadConfig()
func init() {
// This is called in init and it should be a one time call. Making reflect calls during runtime is not a great idea.
// We unmarshal json response from transformer into PostParametersT struct.
// Since unmarshal doesn't check if the fields are present in the json or not and instead just initialze to zero value, we have to manually do this check on all fields before unmarshaling
// This function gets a list of fields tagged as json from the struct and populates in postParametersTFields
postParametersTFields = misc.GetMandatoryJSONFieldNames(PostParametersT{})
}

func loadConfig() {
destTransformURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
userTransformURL = config.GetString("USER_TRANSFORM_URL", destTransformURL)
}

const (
// PostDataKV means post data is sent as KV
PostDataKV = iota + 1
// PostDataJSON means post data is sent as JSON
PostDataJSON
// PostDataXML means post data is sent as XML
PostDataXML
)

// PostParametersT is a struct for holding all the values from transformerResponse and use them to publish an event to a destination
// optional is a custom tag introduced by us and is handled by GetMandatoryJSONFieldNames. Its intentionally added
// after two commas because the tag that comes after the first comma should be known by json parser
Expand All @@ -73,7 +53,7 @@ type TransResponseT struct {

func CollectDestErrorStats(input []byte) {
var integrationStat TransStatsT
err := jsonfast.Unmarshal(input, &integrationStat)
err := jsonFast.Unmarshal(input, &integrationStat)
if err == nil {
if len(integrationStat.StatTags) > 0 {
stats.Default.NewTaggedStat("integration.failure_detailed", stats.CountType, integrationStat.StatTags).Increment()
Expand All @@ -83,7 +63,7 @@ func CollectDestErrorStats(input []byte) {

func CollectIntgTransformErrorStats(input []byte) {
var integrationStats []TransStatsT
err := jsonfast.Unmarshal(input, &integrationStats)
err := jsonFast.Unmarshal(input, &integrationStats)
if err == nil {
for _, integrationStat := range integrationStats {
if len(integrationStat.StatTags) > 0 {
Expand All @@ -95,7 +75,7 @@ func CollectIntgTransformErrorStats(input []byte) {

// GetPostInfo parses the transformer response
func ValidatePostInfo(transformRawParams PostParametersT) error {
transformRaw, err := jsonfast.Marshal(transformRawParams)
transformRaw, err := jsonFast.Marshal(transformRawParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,35 +140,3 @@ func FilterClientIntegrations(clientEvent types.SingularEventT, destNameIDMap ma
retVal = outVal
return
}

// GetTransformerURL gets the transfomer base url endpoint
func GetTransformerURL() string {
return destTransformURL
}

// GetDestinationURL returns node URL
func GetDestinationURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", destTransformURL, strings.ToLower(destType))
if slices.Contains(warehouseutils.WarehouseDestinations, destType) {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", config.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
if destType == "RS" {
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
if destType == "CLICKHOUSE" {
enableArraySupport := fmt.Sprintf("chEnableArraySupport=%s", fmt.Sprintf("%v", config.GetBool("Warehouse.clickhouse.enableArraySupport", false)))
return destinationEndPoint + "?" + whSchemaVersionQueryParam + "&" + enableArraySupport
}
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
return destinationEndPoint
}

// GetUserTransformURL returns the port of running user transform
func GetUserTransformURL() string {
return userTransformURL + "/customTransform"
}

// GetTrackingPlanValidationURL returns the port of running tracking plan validation
func GetTrackingPlanValidationURL() string {
return destTransformURL + "/v0/validate"
}
fracasula marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -234,7 +236,7 @@ func TestProcessorManager(t *testing.T) {
},
)
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
Expand Down Expand Up @@ -274,7 +276,7 @@ func TestProcessorManager(t *testing.T) {
)

mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup().Times(1).Do(func() {
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
Expand Down
29 changes: 3 additions & 26 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (proc *Handle) Setup(
}
}))

proc.transformer.Setup()
proc.transformer.Setup(config.Default, proc.logger, proc.statsFactory)
proc.crashRecover()
}

Expand Down Expand Up @@ -2136,7 +2136,6 @@ func (proc *Handle) transformSrcDest(
}
// REPORTING - END

url := integrations.GetDestinationURL(destType)
var response transformer.ResponseT
var eventsToTransform []transformer.TransformerEventT
// Send to custom transformer only if the destination has a transformer enabled
Expand All @@ -2147,7 +2146,7 @@ func (proc *Handle) transformSrcDest(

trace.WithRegion(ctx, "UserTransform", func() {
startedAt := time.Now()
response = proc.transformer.Transform(ctx, eventList, integrations.GetUserTransformURL(), proc.config.userTransformBatchSize)
response = proc.transformer.UserTransform(ctx, eventList, proc.config.userTransformBatchSize)
d := time.Since(startedAt)
userTransformationStat.transformTime.SendTiming(d)

Expand Down Expand Up @@ -2278,7 +2277,7 @@ func (proc *Handle) transformSrcDest(
trace.Logf(ctx, "Dest Transform", "input size %d", len(eventsToTransform))
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, url, proc.config.transformBatchSize)
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize)

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down Expand Up @@ -2658,28 +2657,6 @@ func (proc *Handle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.Stat
})
}

func subJobMerger(mergedJob, subJob *storeMessage) *storeMessage {
mergedJob.statusList = append(mergedJob.statusList, subJob.statusList...)
mergedJob.destJobs = append(mergedJob.destJobs, subJob.destJobs...)
mergedJob.batchDestJobs = append(mergedJob.batchDestJobs, subJob.batchDestJobs...)

mergedJob.procErrorJobs = append(mergedJob.procErrorJobs, subJob.procErrorJobs...)
for id, job := range subJob.procErrorJobsByDestID {
mergedJob.procErrorJobsByDestID[id] = append(mergedJob.procErrorJobsByDestID[id], job...)
}

mergedJob.reportMetrics = append(mergedJob.reportMetrics, subJob.reportMetrics...)
for dupStatKey, count := range subJob.sourceDupStats {
mergedJob.sourceDupStats[dupStatKey] += count
}
for id, v := range subJob.uniqueMessageIds {
mergedJob.uniqueMessageIds[id] = v
}
mergedJob.totalEvents += subJob.totalEvents

return mergedJob
}

func throughputPerSecond(processedJob int, timeTaken time.Duration) int {
normalizedTime := float64(timeTaken) / float64(time.Second)
return int(float64(processedJob) / normalizedTime)
Expand Down
Loading