Skip to content

Commit

Permalink
feat: add transform function to async destination manager
Browse files Browse the repository at this point in the history
  • Loading branch information
koladilip committed Apr 16, 2024
1 parent c41ae57 commit 95e3e1d
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
router_utils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -29,6 +30,10 @@ func NewBingAdsBulkUploader(destName string, service bingads.BulkServiceI, clien
}
}

func (b *BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(job), nil

Check warning on line 34 in router/batchrouter/asyncdestinationmanager/bing-ads/bulk_uploader.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/bing-ads/bulk_uploader.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}

/*
This function create at most 3 zip files from the text file created by the batchrouter
It takes the text file path as input and returns the zip file path
Expand Down
17 changes: 17 additions & 0 deletions router/batchrouter/asyncdestinationmanager/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AsyncDestinationManager interface {
Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput
Poll(pollInput AsyncPoll) PollStatusResponse
GetUploadStats(UploadStatsInput GetUploadStatsInput) GetUploadStatsResponse
Transform(job *jobsdb.JobT) (string, error)
}

var AsyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BING_ADS", "ELOQUA"}
Expand Down Expand Up @@ -141,6 +142,22 @@ func GetTransformedData(payload stdjson.RawMessage) string {
return gjson.Get(string(payload), "body.JSON").String()
}

func GetMarshalledData(job *jobsdb.JobT) string {
payload := GetTransformedData(job.EventPayload)
var asyncJob AsyncJob
err := stdjson.Unmarshal([]byte(payload), &asyncJob.Message)
if err != nil {
panic("Unmarshalling Transformer Response Failed")

Check warning on line 150 in router/batchrouter/asyncdestinationmanager/common/common.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/common/common.go#L145-L150

Added lines #L145 - L150 were not covered by tests
}
asyncJob.Metadata = make(map[string]interface{})
asyncJob.Metadata["job_id"] = job.JobID
responsePayload, err := stdjson.Marshal(asyncJob)
if err != nil {
panic("Marshalling Response Payload Failed")

Check warning on line 156 in router/batchrouter/asyncdestinationmanager/common/common.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/common/common.go#L152-L156

Added lines #L152 - L156 were not covered by tests
}
return string(responsePayload)

Check warning on line 158 in router/batchrouter/asyncdestinationmanager/common/common.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/common/common.go#L158

Added line #L158 was not covered by tests
}

func GetBatchRouterConfigInt64(key, destType string, defaultValue int64) int64 {
destOverrideFound := config.IsSet("BatchRouter." + destType + "." + key)
if destOverrideFound {
Expand Down
7 changes: 7 additions & 0 deletions router/batchrouter/asyncdestinationmanager/common/manager.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package common

import (
"errors"
"fmt"

"github.com/rudderlabs/rudder-server/jobsdb"
)

type InvalidManager struct{}

func (f *InvalidManager) Transform(job *jobsdb.JobT) (string, error) {
return "", errors.New("invalid job")

Check warning on line 13 in router/batchrouter/asyncdestinationmanager/common/manager.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/common/manager.go#L12-L13

Added lines #L12 - L13 were not covered by tests
}

func (f *InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
abortedJobIDs := append(asyncDestStruct.ImportingJobIDs, asyncDestStruct.FailedJobIDs...)
return AsyncUploadOutput{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

Expand All @@ -23,6 +24,10 @@ func (b *EloquaBulkUploader) createAsyncUploadErrorOutput(errorString string, er
}
}

func (b *EloquaBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(job), nil

Check warning on line 28 in router/batchrouter/asyncdestinationmanager/eloqua/bulk_uploader.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/eloqua/bulk_uploader.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}

func (b *EloquaBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
uploadRetryableStat := stats.Default.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
Expand Down
15 changes: 0 additions & 15 deletions router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,6 @@ import (

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func GetMarshalledData(payload string, jobID int64) string {
var job common.AsyncJob
err := json.Unmarshal([]byte(payload), &job.Message)
if err != nil {
panic("Unmarshalling Transformer Response Failed")
}
job.Metadata = make(map[string]interface{})
job.Metadata["job_id"] = jobID
responsePayload, err := json.Marshal(job)
if err != nil {
panic("Marshalling Response Payload Failed")
}
return string(responsePayload)
}

func NewManager(destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig) (common.AsyncDestinationManager, error) {
switch destination.DestinationDefinition.Name {
case "BINGADS_AUDIENCE":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func extractJobStats(keyMap map[string]interface{}, importingJobIDs []int64, sta
return succesfulJobIDs, failedJobIDsTrans
}

func (b *MarketoBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(job), nil

Check warning on line 231 in router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go#L230-L231

Added lines #L230 - L231 were not covered by tests
}

func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
destinationID := destination.ID
Expand Down
15 changes: 11 additions & 4 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
"github.com/rudderlabs/rudder-server/router/rterror"
"github.com/rudderlabs/rudder-server/router/types"
Expand Down Expand Up @@ -465,13 +464,21 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) {
writeAtBytes := brt.asyncDestinationStruct[destinationID].Size
allowedSize := brt.maxPayloadSizeInBytes
for _, job := range batchJobs.Jobs {
transformedData := common.GetTransformedData(job.EventPayload)
if brt.asyncDestinationStruct[destinationID].Count < brt.maxEventsInABatch &&
!brt.asyncDestinationStruct[destinationID].UploadInProgress &&
brt.asyncDestinationStruct[destinationID].Size < allowedSize {
fileData := asyncdestinationmanager.GetMarshalledData(transformedData, job.JobID)
fileData, err := brt.asyncDestinationStruct[destinationID].Manager.Transform(job)
if err != nil {
failedAsyncJobs := BatchedJobs{
Jobs: []*jobsdb.JobT{job},
Connection: batchJobs.Connection,
TimeWindow: batchJobs.TimeWindow,

Check warning on line 475 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L470-L475

Added lines #L470 - L475 were not covered by tests
}
brt.updateJobStatus(&failedAsyncJobs, false, err, false)
continue

Check warning on line 478 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L477-L478

Added lines #L477 - L478 were not covered by tests
}
brt.asyncDestinationStruct[destinationID].Size = brt.asyncDestinationStruct[destinationID].Size + len([]byte(fileData+"\n"))
_, err := file.WriteAt([]byte(fileData+"\n"), int64(writeAtBytes))
_, err = file.WriteAt([]byte(fileData+"\n"), int64(writeAtBytes))

Check warning on line 481 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L481

Added line #L481 was not covered by tests
if err != nil {
panic(fmt.Errorf("BRT: %s: file write failed : %s", brt.destType, err.Error()))
}
Expand Down

0 comments on commit 95e3e1d

Please sign in to comment.