Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 7, 2023
1 parent 83fc2ed commit 65f27f0
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 464 deletions.
1 change: 0 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func TestDynamicClusterManager(t *testing.T) {
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)

rtFactory := &router.Factory{
Logger: logger.NOP,
Expand Down
3 changes: 1 addition & 2 deletions enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ func (handle *Handler) initSourceWorkers(ctx context.Context) {
uploader: handle.uploader,
}
handle.workers[i] = worker
worker.transformer = transformer.NewTransformer()
worker.transformer.Setup(config.Default, handle.log, stats.Default)
worker.transformer = transformer.NewTransformer(config.Default, handle.log, stats.Default)

Check warning on line 152 in enterprise/replay/replay.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/replay.go#L152

Added line #L152 was not covered by tests
go worker.workerProcess(ctx)
}
handle.initSourceWorkersChannel <- true
Expand Down
6 changes: 3 additions & 3 deletions enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri

var jobs []*jobsdb.JobT

var transEvents []transformer.TransformerEventT
var transEvents []transformer.TransformerEvent

Check warning on line 115 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L115

Added line #L115 was not covered by tests
transformationVersionID := config.GetString("TRANSFORMATION_VERSION_ID", "")

for sc.Scan() {
Expand Down Expand Up @@ -150,14 +150,14 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri

messageID := uuid.New().String()

metadata := transformer.MetadataT{
metadata := transformer.Metadata{

Check warning on line 153 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L153

Added line #L153 was not covered by tests
MessageID: messageID,
DestinationID: gjson.GetBytes(copyLineBytes, "parameters.destination_id").String(),
}

transformation := backendconfig.TransformationT{VersionID: config.GetString("TRANSFORMATION_VERSION_ID", "")}

transEvent := transformer.TransformerEventT{
transEvent := transformer.TransformerEvent{

Check warning on line 160 in enterprise/replay/sourceWorker.go

View check run for this annotation

Codecov / codecov/patch

enterprise/replay/sourceWorker.go#L160

Added line #L160 was not covered by tests
Message: message,
Metadata: metadata,
Destination: backendconfig.DestinationT{Transformations: []backendconfig.TransformationT{transformation}},
Expand Down
27 changes: 6 additions & 21 deletions mocks/processor/transformer/mock_transformer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions processor/eventfilter/eventfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetSupportedMessageEvents(destination *backendconfig.DestinationT) ([]strin
}

type AllowTransformerEventParams struct {
TransformerEvent *transformer.TransformerEventT
TransformerEvent *transformer.TransformerEvent
SupportedMessageTypes []string
}

Expand Down Expand Up @@ -123,13 +123,13 @@ Currently this method supports below validations(executed in the same order):
2. Validate if the event is sendable to destination based on connectionMode, sourceType & messageType
*/
func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEventT, supportedMsgTypes []string) (bool, *transformer.TransformerResponseT) {
func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEvent, supportedMsgTypes []string) (bool, *transformer.TransformerResponse) {
// MessageType filtering -- STARTS
messageType := strings.TrimSpace(strings.ToLower(getMessageType(&transformerEvent.Message)))
if messageType == "" {
// We will abort the event
errMessage := "Invalid message type. Type assertion failed"
resp := &transformer.TransformerResponseT{
resp := &transformer.TransformerResponse{
Output: transformerEvent.Message, StatusCode: 400,
Metadata: transformerEvent.Metadata,
Error: errMessage,
Expand Down Expand Up @@ -193,7 +193,7 @@ Example:
...
}
*/
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponseT) {
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponse) {
destination := connectionModeFilterParams.Destination
srcType := strings.TrimSpace(connectionModeFilterParams.SrcType)
messageType := connectionModeFilterParams.Event.MessageType
Expand Down
8 changes: 4 additions & 4 deletions processor/integrations/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

var (
jsonFast = jsoniter.ConfigCompatibleWithStandardLibrary
json = jsoniter.ConfigCompatibleWithStandardLibrary
postParametersTFields []string
)

Expand Down Expand Up @@ -53,7 +53,7 @@ type TransResponseT struct {

func CollectDestErrorStats(input []byte) {
var integrationStat TransStatsT
err := jsonFast.Unmarshal(input, &integrationStat)
err := json.Unmarshal(input, &integrationStat)
if err == nil {
if len(integrationStat.StatTags) > 0 {
stats.Default.NewTaggedStat("integration.failure_detailed", stats.CountType, integrationStat.StatTags).Increment()
Expand All @@ -63,7 +63,7 @@ func CollectDestErrorStats(input []byte) {

func CollectIntgTransformErrorStats(input []byte) {
var integrationStats []TransStatsT
err := jsonFast.Unmarshal(input, &integrationStats)
err := json.Unmarshal(input, &integrationStats)
if err == nil {
for _, integrationStat := range integrationStats {
if len(integrationStat.StatTags) > 0 {
Expand All @@ -75,7 +75,7 @@ func CollectIntgTransformErrorStats(input []byte) {

// GetPostInfo parses the transformer response
func ValidatePostInfo(transformRawParams PostParametersT) error {
transformRaw, err := jsonFast.Marshal(transformRawParams)
transformRaw, err := json.Marshal(transformRawParams)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"sync"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -87,7 +91,7 @@ func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDb, esDB *job
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(transformer.NewTransformer()),
Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
Expand Down
10 changes: 2 additions & 8 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -236,9 +234,7 @@ func TestProcessorManager(t *testing.T) {
},
)
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)
processor.BackendConfig = mockBackendConfig
processor.Handle.transformer = mockTransformer
Expand Down Expand Up @@ -276,9 +272,7 @@ func TestProcessorManager(t *testing.T) {
)

mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
mockTransformer.EXPECT().Setup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Do(func(*config.Config, logger.Logger, stats.Stats) {
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
})
processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures)
mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1)

require.NoError(t, processor.Start())
Expand Down
Loading

0 comments on commit 65f27f0

Please sign in to comment.