Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 'config' field to proxy request for v1 #4264

Merged
merged 8 commits into from
Jan 4, 2024
3 changes: 2 additions & 1 deletion router/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type ProxyRequestMetadata struct {

type ProxyRequestPayload struct {
integrations.PostParametersT
Metadata []ProxyRequestMetadata `json:"metadata"`
Metadata []ProxyRequestMetadata `json:"metadata"`
DestinationConfig map[string]interface{} `json:"destinationConfig"`
}

type ProxyRequestParams struct {
Expand Down
6 changes: 5 additions & 1 deletion router/transformer/transformer_proxy_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,14 @@ func TestV1Adapter(t *testing.T) {
DontBatch: false,
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
DestName: "testDestType",
}
expectedPayload := `{"type":"a","endpoint":"a.com","method":"","userId":"","headers":null,"params":null,"body":{"jobId":1},"files":null,"metadata":[{"jobId":1,"attemptNum":0,"userId":"","sourceId":"","destinationId":"","workspaceId":"","secret":null,"dontBatch":true},{"jobId":2,"attemptNum":0,"userId":"","sourceId":"","destinationId":"","workspaceId":"","secret":null,"dontBatch":false}]}`
expectedPayload := `{"type":"a","endpoint":"a.com","method":"","userId":"","headers":null,"params":null,"body":{"jobId":1},"files":null,"metadata":[{"jobId":1,"attemptNum":0,"userId":"","sourceId":"","destinationId":"","workspaceId":"","secret":null,"dontBatch":true},{"jobId":2,"attemptNum":0,"userId":"","sourceId":"","destinationId":"","workspaceId":"","secret":null,"dontBatch":false}],"destinationConfig":{"key_1":"val_1","key_2":"val_2"}}`

payload, err := v1Adapter.getPayload(proxyReqParms)
require.Nil(t, err)
Expand Down
24 changes: 24 additions & 0 deletions router/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func TestProxyRequest(t *testing.T) {
DestinationID: "destination_id",
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
{
Expand Down Expand Up @@ -181,6 +185,10 @@ func TestProxyRequest(t *testing.T) {
DestinationID: "destination_id",
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
{
Expand Down Expand Up @@ -223,6 +231,10 @@ func TestProxyRequest(t *testing.T) {
DestinationID: "destination_id",
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
{
Expand Down Expand Up @@ -264,6 +276,10 @@ func TestProxyRequest(t *testing.T) {
DestinationID: "destination_id",
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
{
Expand Down Expand Up @@ -303,6 +319,10 @@ func TestProxyRequest(t *testing.T) {
DestinationID: "destination_id",
},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
{
Expand Down Expand Up @@ -336,6 +356,10 @@ func TestProxyRequest(t *testing.T) {
},
Files: map[string]interface{}{},
},
DestinationConfig: map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
},
},
}
Expand Down
5 changes: 3 additions & 2 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,9 @@ func (w *worker) proxyRequest(ctx context.Context, destinationJob types.Destinat
proxyReqparams := &transformer.ProxyRequestParams{
DestName: w.rt.destType,
ResponseData: transformer.ProxyRequestPayload{
PostParametersT: val,
Metadata: m,
PostParametersT: val,
Metadata: m,
DestinationConfig: destinationJob.Destination.Config,
},
Adapter: transformer.NewTransformerProxyAdapter(w.rt.transformerFeaturesService.TransformerProxyVersion(), w.rt.logger),
}
Expand Down
121 changes: 85 additions & 36 deletions router/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rudderlabs/rudder-server/router/types"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -181,18 +182,29 @@ var _ = Describe("Proxy Request", func() {

mockTransformer.EXPECT().ProxyRequest(gomock.Any(), gomock.Any()).
Times(1).
Return(transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 200,
ProxyRequestResponseBody: "OK",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 200,
2: 201,
},
RespBodys: map[int64]string{
1: "ok1",
2: "ok2",
},
DoAndReturn(func(ctx context.Context, proxyReqParams *transformer.ProxyRequestParams) transformer.ProxyRequestResponse {
Expect(len(proxyReqParams.ResponseData.Metadata)).To(Equal(2))
Expect(proxyReqParams.ResponseData.Metadata[0].JobID).To(Equal(int64(1)))
Expect(proxyReqParams.ResponseData.Metadata[1].JobID).To(Equal(int64(2)))
Expect(proxyReqParams.ResponseData.DestinationConfig).To(Equal(map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
}))

return transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 200,
ProxyRequestResponseBody: "OK",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 200,
2: 201,
},
RespBodys: map[int64]string{
1: "ok1",
2: "ok2",
},
}
})

router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), throttler.NewNoOpThrottlerFactory())
Expand Down Expand Up @@ -220,6 +232,11 @@ var _ = Describe("Proxy Request", func() {
},
Destination: backendconfig.DestinationT{
ID: gaDestinationID,
Config: map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
},
},
Batched: false,
StatusCode: 200,
Expand Down Expand Up @@ -256,18 +273,29 @@ var _ = Describe("Proxy Request", func() {

mockTransformer.EXPECT().ProxyRequest(gomock.Any(), gomock.Any()).
Times(1).
Return(transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 400,
ProxyRequestResponseBody: "Err",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 400,
2: 401,
},
RespBodys: map[int64]string{
1: "err1",
2: "err2",
},
DoAndReturn(func(ctx context.Context, proxyReqParams *transformer.ProxyRequestParams) transformer.ProxyRequestResponse {
Expect(len(proxyReqParams.ResponseData.Metadata)).To(Equal(2))
Expect(proxyReqParams.ResponseData.Metadata[0].JobID).To(Equal(int64(1)))
Expect(proxyReqParams.ResponseData.Metadata[1].JobID).To(Equal(int64(2)))
Expect(proxyReqParams.ResponseData.DestinationConfig).To(Equal(map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
}))

return transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 400,
ProxyRequestResponseBody: "Err",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 400,
2: 401,
},
RespBodys: map[int64]string{
1: "err1",
2: "err2",
},
}
})

router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), throttler.NewNoOpThrottlerFactory())
Expand Down Expand Up @@ -295,6 +323,11 @@ var _ = Describe("Proxy Request", func() {
},
Destination: backendconfig.DestinationT{
ID: gaDestinationID,
Config: map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"auth": map[string]interface{}{
Expand Down Expand Up @@ -338,18 +371,29 @@ var _ = Describe("Proxy Request", func() {

mockTransformer.EXPECT().ProxyRequest(gomock.Any(), gomock.Any()).
Times(1).
Return(transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 400,
ProxyRequestResponseBody: "Err",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 500,
2: 501,
},
RespBodys: map[int64]string{
1: "err1",
2: "err2",
},
DoAndReturn(func(ctx context.Context, proxyReqParams *transformer.ProxyRequestParams) transformer.ProxyRequestResponse {
Expect(len(proxyReqParams.ResponseData.Metadata)).To(Equal(2))
Expect(proxyReqParams.ResponseData.Metadata[0].JobID).To(Equal(int64(1)))
Expect(proxyReqParams.ResponseData.Metadata[1].JobID).To(Equal(int64(2)))
Expect(proxyReqParams.ResponseData.DestinationConfig).To(Equal(map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
}))

return transformer.ProxyRequestResponse{
ProxyRequestStatusCode: 400,
ProxyRequestResponseBody: "Err",
RespContentType: "application/json",
RespStatusCodes: map[int64]int{
1: 500,
2: 501,
},
RespBodys: map[int64]string{
1: "err1",
2: "err2",
},
}
})

router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), throttler.NewNoOpThrottlerFactory())
Expand Down Expand Up @@ -377,6 +421,11 @@ var _ = Describe("Proxy Request", func() {
},
Destination: backendconfig.DestinationT{
ID: gaDestinationID,
Config: map[string]interface{}{
"x": map[string]interface{}{
"y": "z",
},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"auth": map[string]interface{}{
Expand Down
Loading