Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 6, 2023
1 parent 133c846 commit 83fc2ed
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 154 deletions.
4 changes: 1 addition & 3 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

mock_jobs_forwarder "github.com/rudderlabs/rudder-server/mocks/jobs-forwarder"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

Expand Down Expand Up @@ -212,7 +210,7 @@ func TestDynamicClusterManager(t *testing.T) {
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(config.Default, logger.Default, stats.Default).Times(1)
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)

rtFactory := &router.Factory{
Logger: logger.NOP,
Expand Down
3 changes: 1 addition & 2 deletions enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/transformer"
)

Expand Down Expand Up @@ -168,7 +167,7 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri
}

if transformationVersionID != "" {
response := worker.transformer.Transform(context.TODO(), transEvents, integrations.GetUserTransformURL(), userTransformBatchSize, transformer.UserTransformerStage)
response := worker.transformer.UserTransform(context.TODO(), transEvents, userTransformBatchSize)

Check warning on line 170 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L170

Added line #L170 was not covered by tests

for _, ev := range response.Events {
destEventJSON, err := json.Marshal(ev.Output[worker.getFieldIdentifier(eventPayload)])
Expand Down
36 changes: 32 additions & 4 deletions mocks/processor/transformer/mock_transformer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 5 additions & 57 deletions processor/integrations/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,26 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
destTransformURL string
userTransformURL string
jsonFast = jsoniter.ConfigCompatibleWithStandardLibrary
postParametersTFields []string
)

func Init() {
loadConfig()
func init() {
// This is called in init and it should be a one time call. Making reflect calls during runtime is not a great idea.
// We unmarshal json response from transformer into PostParametersT struct.
// Since unmarshal doesn't check if the fields are present in the json or not and instead just initialze to zero value, we have to manually do this check on all fields before unmarshaling
// This function gets a list of fields tagged as json from the struct and populates in postParametersTFields
postParametersTFields = misc.GetMandatoryJSONFieldNames(PostParametersT{})
}

func loadConfig() {
destTransformURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
userTransformURL = config.GetString("USER_TRANSFORM_URL", destTransformURL)
}

const (
// PostDataKV means post data is sent as KV
PostDataKV = iota + 1
// PostDataJSON means post data is sent as JSON
PostDataJSON
// PostDataXML means post data is sent as XML
PostDataXML
)

// PostParametersT is a struct for holding all the values from transformerResponse and use them to publish an event to a destination
// optional is a custom tag introduced by us and is handled by GetMandatoryJSONFieldNames. Its intentionally added
// after two commas because the tag that comes after the first comma should be known by json parser
Expand All @@ -73,7 +53,7 @@ type TransResponseT struct {

func CollectDestErrorStats(input []byte) {
var integrationStat TransStatsT
err := jsonfast.Unmarshal(input, &integrationStat)
err := jsonFast.Unmarshal(input, &integrationStat)
if err == nil {
if len(integrationStat.StatTags) > 0 {
stats.Default.NewTaggedStat("integration.failure_detailed", stats.CountType, integrationStat.StatTags).Increment()
Expand All @@ -83,7 +63,7 @@ func CollectDestErrorStats(input []byte) {

func CollectIntgTransformErrorStats(input []byte) {
var integrationStats []TransStatsT
err := jsonfast.Unmarshal(input, &integrationStats)
err := jsonFast.Unmarshal(input, &integrationStats)
if err == nil {
for _, integrationStat := range integrationStats {
if len(integrationStat.StatTags) > 0 {
Expand All @@ -95,7 +75,7 @@ func CollectIntgTransformErrorStats(input []byte) {

// GetPostInfo parses the transformer response
func ValidatePostInfo(transformRawParams PostParametersT) error {
transformRaw, err := jsonfast.Marshal(transformRawParams)
transformRaw, err := jsonFast.Marshal(transformRawParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,35 +140,3 @@ func FilterClientIntegrations(clientEvent types.SingularEventT, destNameIDMap ma
retVal = outVal
return
}

// GetTransformerURL gets the transfomer base url endpoint
func GetTransformerURL() string {
return destTransformURL
}

// GetDestinationURL returns node URL
func GetDestinationURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", destTransformURL, strings.ToLower(destType))
if slices.Contains(warehouseutils.WarehouseDestinations, destType) {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", config.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
if destType == "RS" {
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
if destType == "CLICKHOUSE" {
enableArraySupport := fmt.Sprintf("chEnableArraySupport=%s", fmt.Sprintf("%v", config.GetBool("Warehouse.clickhouse.enableArraySupport", false)))
return destinationEndPoint + "?" + whSchemaVersionQueryParam + "&" + enableArraySupport
}
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
return destinationEndPoint
}

// GetUserTransformURL returns the port of running user transform
func GetUserTransformURL() string {
return userTransformURL + "/customTransform"
}

// GetTrackingPlanValidationURL returns the port of running tracking plan validation
func GetTrackingPlanValidationURL() string {
return destTransformURL + "/v0/validate"
}
4 changes: 2 additions & 2 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestProcessorManager(t *testing.T) {
},
)
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(config.Default, logger.Default, stats.Default).Times(1).Do(func() {
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestProcessorManager(t *testing.T) {
)

mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(config.Default, logger.Default, stats.Default).Times(1).Do(func() {
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
Expand Down
5 changes: 2 additions & 3 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,6 @@ func (proc *Handle) transformSrcDest(
}
// REPORTING - END

url := integrations.GetDestinationURL(destType)
var response transformer.ResponseT
var eventsToTransform []transformer.TransformerEventT
// Send to custom transformer only if the destination has a transformer enabled
Expand All @@ -2147,7 +2146,7 @@ func (proc *Handle) transformSrcDest(

trace.WithRegion(ctx, "UserTransform", func() {
startedAt := time.Now()
response = proc.transformer.Transform(ctx, eventList, integrations.GetUserTransformURL(), proc.config.userTransformBatchSize, transformer.UserTransformerStage)
response = proc.transformer.UserTransform(ctx, eventList, proc.config.userTransformBatchSize)
d := time.Since(startedAt)
userTransformationStat.transformTime.SendTiming(d)

Expand Down Expand Up @@ -2278,7 +2277,7 @@ func (proc *Handle) transformSrcDest(
trace.Logf(ctx, "Dest Transform", "input size %d", len(eventsToTransform))
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, url, proc.config.transformBatchSize, transformer.DestTransformerStage)
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize)

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down
Loading

0 comments on commit 83fc2ed

Please sign in to comment.