Skip to content

Commit

Permalink
feat: onboard new generic destination sftp (#4601)
Browse files Browse the repository at this point in the history
* feat: add stubs for sftp destination manager

* refactor: async destination logic

* refactor: sendJobsToStorage for sftp destinations

* feat: onboard new destination sftp

* feat: add action support and update sftp max size logic

* feat: add file format

* refactor: using struct for unmarshalling

* refactor: unexport structs and use json decoder

* test: add testcases

* feat: implement Transform for yandeskmetrica

* fix: add yandex metrica in async destinations list

* refactor: address review comments

* chore: bump go-kit version

* refactor: address review comments

* refactor: address review comments

* refactor: address review comments

* refactor: address review comments

* refactor: address review comments

* chore: remove extra line

* chore: fix manager instance name

* fix: file upload error

---------

Co-authored-by: Dilip Kola <kdilipkola@gmail.com>
Co-authored-by: Dilip Kola <33080863+koladilip@users.noreply.github.com>
Co-authored-by: ItsSudip <sudip.paul1997@gmail.com>
  • Loading branch information
4 people committed May 3, 2024
1 parent 566f430 commit c6a28bb
Show file tree
Hide file tree
Showing 27 changed files with 1,116 additions and 99 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ require (
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/bing-ads-go-sdk v0.2.1
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.29.0
github.com/rudderlabs/rudder-go-kit v0.30.0
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.4.0
github.com/rudderlabs/sql-tunnels v0.1.6
Expand Down Expand Up @@ -228,6 +228,7 @@ require (
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -252,6 +253,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/sftp v1.13.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,8 @@ github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xb
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.29.0 h1:zz70OGuT9lCVSoUkLuBKNfGaR9Sz+QZwdu+T4F8ItdY=
github.com/rudderlabs/rudder-go-kit v0.29.0/go.mod h1:dSJQHi1yR7wWps5BVQFpL56eRXk1bwCRDbuFDeplYSM=
github.com/rudderlabs/rudder-go-kit v0.30.0 h1:JV5NlnvAboh//2mo2sV3PDXuHPX5Y6CtjZXibMc1wWM=
github.com/rudderlabs/rudder-go-kit v0.30.0/go.mod h1:Io1s43ApHrIMUUrz0Z2j+zb9p8r49NKT5YUzMIh62Q8=
github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q=
github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8=
github.com/rudderlabs/rudder-schemas v0.4.0 h1:CkpmO8NI0OFMcFyD6WAGN/gIsmqyrC+Bd/htb9eBhpo=
Expand Down
76 changes: 76 additions & 0 deletions mocks/batchrouter/sftp/mock_sftp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"time"

"github.com/samber/lo"
"github.com/tidwall/gjson"

"github.com/rudderlabs/bing-ads-go-sdk/bingads"
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
router_utils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -29,6 +31,10 @@ func NewBingAdsBulkUploader(destName string, service bingads.BulkServiceI, clien
}
}

func (*BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(gjson.GetBytes(job.EventPayload, "body.JSON").String(), job.JobID)
}

/*
This function create at most 3 zip files from the text file created by the batchrouter
It takes the text file path as input and returns the zip file path
Expand Down
53 changes: 50 additions & 3 deletions router/batchrouter/asyncdestinationmanager/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,56 @@ package common

import (
stdjson "encoding/json"
"net/http"
"strings"
"sync"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/config"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
)

type AsyncDestinationManager interface {
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type AsyncUploadAndTransformManager interface {
Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput
Transform(job *jobsdb.JobT) (string, error)
}

type AsyncDestinationManager interface {
AsyncUploadAndTransformManager
Poll(pollInput AsyncPoll) PollStatusResponse
GetUploadStats(UploadStatsInput GetUploadStatsInput) GetUploadStatsResponse
}

var AsyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BING_ADS", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS"}
type SimpleAsyncDestinationManager struct {
UploaderAndTransformer AsyncUploadAndTransformManager
}

func (m SimpleAsyncDestinationManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
return m.UploaderAndTransformer.Upload(asyncDestStruct)
}

func (m SimpleAsyncDestinationManager) Poll(AsyncPoll) PollStatusResponse {
return PollStatusResponse{
StatusCode: http.StatusOK,
Complete: true,
}
}

func (m SimpleAsyncDestinationManager) GetUploadStats(GetUploadStatsInput) GetUploadStatsResponse {
return GetUploadStatsResponse{
StatusCode: http.StatusOK,
}
}

func (m SimpleAsyncDestinationManager) Transform(job *jobsdb.JobT) (string, error) {
return m.UploaderAndTransformer.Transform(job)
}

type PollStatusResponse struct {
Complete bool
Expand Down Expand Up @@ -138,7 +170,22 @@ type GetUploadStatsResponse struct {
}

func GetTransformedData(payload stdjson.RawMessage) string {
return gjson.Get(string(payload), "body.JSON").String()
return gjson.GetBytes(payload, "body.JSON").String()
}

func GetMarshalledData(payload string, jobID int64) (string, error) {
var asyncJob AsyncJob
err := json.Unmarshal([]byte(payload), &asyncJob.Message)
if err != nil {
return "", err
}
asyncJob.Metadata = make(map[string]interface{})
asyncJob.Metadata["job_id"] = jobID
responsePayload, err := json.Marshal(asyncJob)
if err != nil {
return "", err
}
return string(responsePayload), nil
}

func GetBatchRouterConfigInt64(key, destType string, defaultValue int64) int64 {
Expand Down
13 changes: 10 additions & 3 deletions router/batchrouter/asyncdestinationmanager/common/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package common

import (
"errors"
"fmt"

"github.com/rudderlabs/rudder-server/jobsdb"
)

type InvalidManager struct{}

func (f *InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
func (*InvalidManager) Transform(job *jobsdb.JobT) (string, error) {
return "", errors.New("invalid job")
}

func (*InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
abortedJobIDs := append(asyncDestStruct.ImportingJobIDs, asyncDestStruct.FailedJobIDs...)
return AsyncUploadOutput{
AbortJobIDs: abortedJobIDs,
Expand All @@ -17,13 +24,13 @@ func (f *InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUp
}
}

func (f *InvalidManager) Poll(_ AsyncPoll) PollStatusResponse {
func (*InvalidManager) Poll(_ AsyncPoll) PollStatusResponse {
return PollStatusResponse{
StatusCode: 400,
}
}

func (f *InvalidManager) GetUploadStats(_ GetUploadStatsInput) GetUploadStatsResponse {
func (*InvalidManager) GetUploadStats(_ GetUploadStatsInput) GetUploadStatsResponse {
return GetUploadStatsResponse{
StatusCode: 400,
}
Expand Down
20 changes: 20 additions & 0 deletions router/batchrouter/asyncdestinationmanager/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package common

import "slices"

var (
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS"}
sftpDestinations = []string{"SFTP"}
)

func IsSFTPDestination(destination string) bool {
return slices.Contains(sftpDestinations, destination)
}

func IsAsyncRegularDestination(destination string) bool {
return slices.Contains(asyncDestinations, destination)
}

func IsAsyncDestination(destination string) bool {
return slices.Contains(append(asyncDestinations, sftpDestinations...), destination)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"time"

"github.com/samber/lo"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

Expand All @@ -23,6 +25,10 @@ func (b *EloquaBulkUploader) createAsyncUploadErrorOutput(errorString string, er
}
}

func (*EloquaBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(gjson.GetBytes(job.EventPayload, "body.JSON").String(), job.JobID)
}

func (b *EloquaBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
uploadRetryableStat := stats.Default.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
Expand Down
40 changes: 20 additions & 20 deletions router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,16 @@ package asyncdestinationmanager
import (
"errors"

jsoniter "github.com/json-iterator/go"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
bingads "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/eloqua"
marketobulkupload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/sftp"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/yandexmetrica"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func GetMarshalledData(payload string, jobID int64) string {
var job common.AsyncJob
err := json.Unmarshal([]byte(payload), &job.Message)
if err != nil {
panic("Unmarshalling Transformer Response Failed")
}
job.Metadata = make(map[string]interface{})
job.Metadata["job_id"] = jobID
responsePayload, err := json.Marshal(job)
if err != nil {
panic("Marshalling Response Payload Failed")
}
return string(responsePayload)
}

func NewManager(destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig) (common.AsyncDestinationManager, error) {
func newRegularManager(destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig) (common.AsyncDestinationManager, error) {
switch destination.DestinationDefinition.Name {
case "BINGADS_AUDIENCE":
return bingads.NewManager(destination, backendConfig)
Expand All @@ -43,3 +25,21 @@ func NewManager(destination *backendconfig.DestinationT, backendConfig backendco
}
return nil, errors.New("invalid destination type")
}

func newSFTPManager(destination *backendconfig.DestinationT) (common.AsyncDestinationManager, error) {
switch destination.DestinationDefinition.Name {
case "SFTP":
return sftp.NewManager(destination)
}
return nil, errors.New("invalid destination type")
}

func NewManager(destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig) (common.AsyncDestinationManager, error) {
switch {
case common.IsAsyncRegularDestination(destination.DestinationDefinition.Name):
return newRegularManager(destination, backendConfig)
case common.IsSFTPDestination(destination.DestinationDefinition.Name):
return newSFTPManager(destination)
}
return nil, errors.New("invalid destination type")
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func extractJobStats(keyMap map[string]interface{}, importingJobIDs []int64, sta
return succesfulJobIDs, failedJobIDsTrans
}

func (*MarketoBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(gjson.GetBytes(job.EventPayload, "body.JSON").String(), job.JobID)
}

func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
destinationID := destination.ID
Expand Down

0 comments on commit c6a28bb

Please sign in to comment.