From f95fa5145939a67d9ef2849bcdf60285d7b200ca Mon Sep 17 00:00:00 2001 From: Anant Jain <62471433+anantjain45823@users.noreply.github.com> Date: Sat, 11 Nov 2023 02:28:28 +0530 Subject: [PATCH] feat: new transformer service to fetch and serve transformer features (#4007) --- app/apphandlers/embeddedAppHandler.go | 10 +- app/apphandlers/gatewayAppHandler.go | 9 +- app/apphandlers/processorAppHandler.go | 8 + app/cluster/integration_test.go | 3 +- gateway/gateway_test.go | 21 +-- gateway/handle_http_auth.go | 1 + gateway/handle_lifecycle.go | 5 +- gateway/internal/types/types.go | 6 +- gateway/response/response.go | 4 + gateway/webhook/setup.go | 22 ++- gateway/webhook/webhook.go | 52 ++++-- gateway/webhook/webhookTransformer.go | 74 ++++++++- gateway/webhook/webhookTransformer_test.go | 65 ++++++++ gateway/webhook/webhook_test.go | 91 +++++++++-- processor/manager.go | 87 +++++----- processor/manager_test.go | 6 +- processor/processor.go | 170 ++++++-------------- processor/processor_test.go | 29 +++- services/transformer/features.go | 67 ++++++++ services/transformer/features_impl.go | 107 ++++++++++++ services/transformer/features_impl_test.go | 117 ++++++++++++++ services/transformer/features_suite_test.go | 13 ++ 22 files changed, 741 insertions(+), 226 deletions(-) create mode 100644 gateway/webhook/webhookTransformer_test.go create mode 100644 services/transformer/features.go create mode 100644 services/transformer/features_impl.go create mode 100644 services/transformer/features_impl_test.go create mode 100644 services/transformer/features_suite_test.go diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 09a22fe6ad..aa36b531c2 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -31,6 +31,7 @@ import ( sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source" transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/payload" @@ -130,6 +131,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) return err } + transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), + FeaturesRetryMaxAttempts: 10, + }) + // This separate gateway db is created just to be used with gateway because in case of degraded mode, // the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it // will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode. @@ -257,6 +264,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) transientSources, fileUploaderProvider, rsourcesService, + transformerFeaturesService, destinationHandle, transformationhandle, enrichers, @@ -319,7 +327,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) ctx, config, logger.NewLogger().Child("gateway"), stats.Default, a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDBForWrite, - rateLimiter, a.versionHandler, rsourcesService, sourceHandle, + rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle, ) if err != nil { return fmt.Errorf("could not setup gateway: %w", err) diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 300f3b8073..102178a5ff 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "time" "golang.org/x/sync/errgroup" @@ -19,6 +20,7 @@ import ( "github.com/rudderlabs/rudder-server/services/db" sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source" "github.com/rudderlabs/rudder-server/services/fileuploader" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types/deployment" ) @@ -122,11 +124,16 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) if err != nil { return err } + transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), + FeaturesRetryMaxAttempts: 10, + }) err = gw.Setup( ctx, config, logger.NewLogger().Child("gateway"), stats.Default, a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB, - rateLimiter, a.versionHandler, rsourcesService, sourceHandle, + rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle, ) if err != nil { return fmt.Errorf("failed to setup gateway: %w", err) diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index cef8753200..ef4df625f8 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -33,6 +33,7 @@ import ( destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/payload" @@ -137,6 +138,12 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return err } + transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), + FeaturesRetryMaxAttempts: 10, + }) + gwDBForProcessor := jobsdb.NewForRead( "gw", jobsdb.WithClearDB(options.ClearDB), @@ -247,6 +254,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options transientSources, fileUploaderProvider, rsourcesService, + transformerFeaturesService, destinationHandle, transformationhandle, enrichers, diff --git a/app/cluster/integration_test.go b/app/cluster/integration_test.go index 3c5a39fd53..57b219d056 100644 --- a/app/cluster/integration_test.go +++ b/app/cluster/integration_test.go @@ -38,6 +38,7 @@ import ( transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rsources" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/pubsub" "github.com/rudderlabs/rudder-server/utils/types/servermode" @@ -208,10 +209,10 @@ func TestDynamicClusterManager(t *testing.T) { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), rsources.NewNoOpService(), + transformer.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, - processor.WithFeaturesRetryMaxAttempts(0), ) processor.BackendConfig = mockBackendConfig processor.Transformer = mockTransformer diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 53ec55e5cd..bf7f1c8c7c 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -45,6 +45,7 @@ import ( mocksTypes "github.com/rudderlabs/rudder-server/mocks/utils/types" sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source" "github.com/rudderlabs/rudder-server/services/rsources" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/pubsub" testutils "github.com/rudderlabs/rudder-server/utils/tests" @@ -266,7 +267,7 @@ var _ = Describe("Gateway Enterprise", func() { Context("Suppress users", func() { BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) statsStore = memstats.New() gateway.stats = statsStore @@ -402,7 +403,7 @@ var _ = Describe("Gateway", func() { Context("Initialization", func() { It("should wait for backend config", func() { gateway := &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) err = gateway.Shutdown() @@ -435,7 +436,7 @@ var _ = Describe("Gateway", func() { GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort)) gateway = &Handle{} - err = gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err = gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) gateway.irh = mockRequestHandler{} @@ -526,7 +527,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) statsStore = memstats.New() @@ -854,7 +855,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) @@ -959,7 +960,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &Handle{} conf.Set("Gateway.enableRateLimit", true) - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) @@ -1051,7 +1052,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) @@ -1334,7 +1335,7 @@ var _ = Describe("Gateway", func() { BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) }) @@ -1360,7 +1361,7 @@ var _ = Describe("Gateway", func() { ) BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) }) @@ -1541,7 +1542,7 @@ var _ = Describe("Gateway", func() { var gateway *Handle BeforeEach(func() { gateway = &Handle{} - err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService()) + err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService()) Expect(err).To(BeNil()) waitForBackendConfigInit(gateway) }) diff --git a/gateway/handle_http_auth.go b/gateway/handle_http_auth.go index 74bf44f26d..da6e049359 100644 --- a/gateway/handle_http_auth.go +++ b/gateway/handle_http_auth.go @@ -172,6 +172,7 @@ func sourceToRequestContext(s backendconfig.SourceT) *gwtypes.AuthRequestContext SourceCategory: s.SourceDefinition.Category, SourceDefName: s.SourceDefinition.Name, ReplaySource: s.IsReplaySource(), + Source: s, } if arctx.SourceCategory == "" { arctx.SourceCategory = eventStreamSourceCategory diff --git a/gateway/handle_lifecycle.go b/gateway/handle_lifecycle.go index d7a906940e..4166023570 100644 --- a/gateway/handle_lifecycle.go +++ b/gateway/handle_lifecycle.go @@ -33,6 +33,7 @@ import ( "github.com/rudderlabs/rudder-server/services/diagnostics" "github.com/rudderlabs/rudder-server/services/rsources" rsources_http "github.com/rudderlabs/rudder-server/services/rsources/http" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -50,7 +51,7 @@ func (gw *Handle) Setup( config *config.Config, logger logger.Logger, stat stats.Stats, application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errDB jobsdb.JobsDB, rateLimiter throttler.Throttler, versionHandler func(w http.ResponseWriter, r *http.Request), - rsourcesService rsources.JobService, sourcehandle sourcedebugger.SourceDebugger, + rsourcesService rsources.JobService, transformerFeaturesService transformer.FeaturesService, sourcehandle sourcedebugger.SourceDebugger, ) error { gw.config = config gw.logger = logger @@ -118,7 +119,7 @@ func (gw *Handle) Setup( gw.batchUserWorkerBatchRequestQ = make(chan *batchUserWorkerBatchRequestT, gw.conf.maxDBWriterProcess) gw.irh = &ImportRequestHandler{Handle: gw} gw.rrh = &RegularRequestHandler{Handle: gw} - gw.webhook = webhook.Setup(gw, gw.stats) + gw.webhook = webhook.Setup(gw, transformerFeaturesService, gw.stats) whURL, err := url.ParseRequestURI(misc.GetWarehouseURL()) if err != nil { return fmt.Errorf("invalid warehouse URL %s: %w", whURL, err) diff --git a/gateway/internal/types/types.go b/gateway/internal/types/types.go index 07fd719701..19528176d7 100644 --- a/gateway/internal/types/types.go +++ b/gateway/internal/types/types.go @@ -1,6 +1,9 @@ package types -import "github.com/rudderlabs/rudder-server/utils/misc" +import ( + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/utils/misc" +) type ContextKey string @@ -23,6 +26,7 @@ type AuthRequestContext struct { ReplaySource bool SourceJobRunID string SourceTaskRunID string + Source backendconfig.SourceT } func (arctx *AuthRequestContext) SourceTag() string { diff --git a/gateway/response/response.go b/gateway/response/response.go index 603eb9fd20..7860aab940 100644 --- a/gateway/response/response.go +++ b/gateway/response/response.go @@ -58,6 +58,8 @@ const ( ContextDeadlineExceeded = "context deadline exceeded" // GatewayTimeout - Gateway timeout GatewayTimeout = "Gateway timeout" + // ServiceUnavailable - Service unavailable + ServiceUnavailable = "Service unavailable" // NoSourceIdInHeader - Failed to read source id from header NoSourceIdInHeader = "Failed to read source id from header" // InvalidSourceID - Invalid source id @@ -100,6 +102,8 @@ var statusMap = map[string]status{ ErrorInParseMultiform: {message: ErrorInParseMultiform, code: http.StatusBadRequest}, NotRudderEvent: {message: NotRudderEvent, code: http.StatusBadRequest}, ContextDeadlineExceeded: {message: GatewayTimeout, code: http.StatusGatewayTimeout}, + GatewayTimeout: {message: GatewayTimeout, code: http.StatusGatewayTimeout}, + ServiceUnavailable: {message: ServiceUnavailable, code: http.StatusServiceUnavailable}, } // status holds the gateway response status message and code diff --git a/gateway/webhook/setup.go b/gateway/webhook/setup.go index d384eb9aa2..dfb6a24674 100644 --- a/gateway/webhook/setup.go +++ b/gateway/webhook/setup.go @@ -9,14 +9,17 @@ import ( "time" "github.com/hashicorp/go-retryablehttp" - "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/config" + + "golang.org/x/sync/errgroup" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats" gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" "github.com/rudderlabs/rudder-server/gateway/webhook/model" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -42,10 +45,8 @@ func newWebhookStats() *webhookStatsT { return &wStats } -func Setup(gwHandle Gateway, stat stats.Stats, opts ...batchTransformerOption) *HandleT { +func Setup(gwHandle Gateway, transformerFeaturesService transformer.FeaturesService, stat stats.Stats, opts ...batchTransformerOption) *HandleT { webhook := &HandleT{gwHandle: gwHandle, stats: stat, logger: logger.NewLogger().Child("gateway").Child("webhook")} - - sourceTransformerURL := strings.TrimSuffix(config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), "/") + "/v0/sources" // Number of incoming webhooks that are batched before calling source transformer webhook.config.maxWebhookBatchSize = config.GetReloadableIntVar(32, 1, "Gateway.webhook.maxBatchSize") // Timeout after which batch is formed anyway with whatever webhooks are available @@ -75,9 +76,16 @@ func Setup(gwHandle Gateway, stat stats.Stats, opts ...batchTransformerOption) * for i := 0; i < maxTransformerProcess; i++ { g.Go(misc.WithBugsnag(func() error { bt := batchWebhookTransformerT{ - webhook: webhook, - stats: newWebhookStats(), - sourceTransformerURL: sourceTransformerURL, + webhook: webhook, + stats: newWebhookStats(), + sourceTransformAdapter: func(ctx context.Context) (sourceTransformAdapter, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-transformerFeaturesService.Wait(): + return newSourceTransformAdapter(transformerFeaturesService.SourceTransformerVersion()), nil + } + }, } for _, opt := range opts { opt(&bt) diff --git a/gateway/webhook/webhook.go b/gateway/webhook/webhook.go index 3f352b01d3..541bb86c32 100644 --- a/gateway/webhook/webhook.go +++ b/gateway/webhook/webhook.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/go-retryablehttp" "github.com/samber/lo" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" @@ -85,9 +86,9 @@ type webhookStatsT struct { } type batchWebhookTransformerT struct { - webhook *HandleT - stats *webhookStatsT - sourceTransformerURL string + webhook *HandleT + stats *webhookStatsT + sourceTransformAdapter func(ctx context.Context) (sourceTransformAdapter, error) } type batchTransformerOption func(bt *batchWebhookTransformerT) @@ -262,6 +263,28 @@ func (webhook *HandleT) batchRequests(sourceDef string, requestQ chan *webhookT) // TODO : return back immediately for blank request body. its waiting till timeout func (bt *batchWebhookTransformerT) batchTransformLoop() { for breq := range bt.webhook.batchRequestQ { + + // If unable to fetch features from transformer, send GatewayTimeout to all requests + // TODO: Remove timeout from here after timeout handler is added in gateway + ctx, cancel := context.WithTimeout(context.Background(), config.GetDurationVar(10, time.Second, "WriteTimeout", "WriteTimeOutInSec")) + sourceTransformAdapter, err := bt.sourceTransformAdapter(ctx) + if err != nil { + for _, req := range breq.batchRequest { + req.done <- transformerResponse{StatusCode: response.GetErrorStatusCode(response.GatewayTimeout), Err: response.GetStatus(response.GatewayTimeout)} + } + cancel() + continue + } + cancel() + + transformerURL, err := sourceTransformAdapter.getTransformerURL(breq.sourceType) + if err != nil { + for _, req := range breq.batchRequest { + req.done <- transformerResponse{StatusCode: response.GetErrorStatusCode(response.ServiceUnavailable), Err: response.GetStatus(response.ServiceUnavailable)} + } + continue + } + var payloadArr [][]byte var webRequests []*webhookT for _, req := range breq.batchRequest { @@ -297,7 +320,14 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() { continue } - payloadArr = append(payloadArr, body) + payload, err := sourceTransformAdapter.getTransformerEvent(req.authContext, body) + if err != nil { + req.done <- transformerResponse{Err: response.GetStatus(response.InvalidWebhookSource)} + continue + } + + payloadArr = append(payloadArr, payload) + webRequests = append(webRequests, req) } @@ -313,7 +343,7 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() { bt.stats.sourceStats[breq.sourceType].numEvents.Count(len(payloadArr)) transformStart := time.Now() - batchResponse := bt.transform(payloadArr, breq.sourceType) + batchResponse := bt.transform(payloadArr, transformerURL) // stats bt.stats.sourceStats[breq.sourceType].sourceTransform.Since(transformStart) @@ -449,12 +479,14 @@ func (webhook *HandleT) countWebhookErrors(sourceType string, arctx *gwtypes.Aut } func (webhook *HandleT) recordWebhookErrors(sourceType, reason string, reqs []*webhookT, statusCode int) { - reqsGroupedBySource := lo.GroupBy(reqs, func(request *webhookT) gwtypes.AuthRequestContext { - return *request.authContext + authCtxs := lo.SliceToMap(reqs, func(request *webhookT) (string, *gwtypes.AuthRequestContext) { + return request.authContext.WriteKey, request.authContext }) - - for si, reqs := range reqsGroupedBySource { - webhook.countWebhookErrors(sourceType, &si, reason, statusCode, len(reqs)) + reqsGroupedBySource := lo.GroupBy(reqs, func(request *webhookT) string { + return request.authContext.WriteKey + }) + for writeKey, reqs := range reqsGroupedBySource { + webhook.countWebhookErrors(sourceType, authCtxs[writeKey], reason, statusCode, len(reqs)) } } diff --git a/gateway/webhook/webhookTransformer.go b/gateway/webhook/webhookTransformer.go index 77d12e2489..5147917db6 100644 --- a/gateway/webhook/webhookTransformer.go +++ b/gateway/webhook/webhookTransformer.go @@ -7,14 +7,79 @@ import ( "fmt" "io" "net/http" - "strings" + "net/url" "time" + "github.com/rudderlabs/rudder-go-kit/config" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" "github.com/rudderlabs/rudder-server/gateway/response" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" ) +type sourceTransformAdapter interface { + getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) + getTransformerURL(sourceType string) (string, error) +} + +type v0Adapter struct{} + +type v1Adapter struct{} + +type V1TransformerEvent struct { + Event json.RawMessage `json:"event"` + Source backendconfig.SourceT `json:"source"` +} + +func (v0 *v0Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) { + return body, nil +} + +func (v0 *v0Adapter) getTransformerURL(sourceType string) (string, error) { + return getTransformerURL(transformer.V0, sourceType) +} + +func (v1 *v1Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) { + source := authCtx.Source + + v1TransformerEvent := V1TransformerEvent{ + Event: body, + Source: backendconfig.SourceT{ + ID: source.ID, + OriginalID: source.OriginalID, + Name: source.Name, + SourceDefinition: source.SourceDefinition, + Config: source.Config, + Enabled: source.Enabled, + WorkspaceID: source.WorkspaceID, + WriteKey: source.WriteKey, + Transient: source.Transient, + }, + } + + return json.Marshal(v1TransformerEvent) +} + +func (v1 *v1Adapter) getTransformerURL(sourceType string) (string, error) { + return getTransformerURL(transformer.V1, sourceType) +} + +func newSourceTransformAdapter(version string) sourceTransformAdapter { + switch version { + case "v1": + return &v1Adapter{} + } + + return &v0Adapter{} +} + +func getTransformerURL(version, sourceType string) (string, error) { + baseURL := config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090") + return url.JoinPath(baseURL, version, "sources", sourceType) +} + type outputToSource struct { Body []byte `json:"body"` ContentType string `json:"contentType"` @@ -45,17 +110,16 @@ func (bt *batchWebhookTransformerT) markResponseFail(reason string) transformerR return resp } -func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string) transformerBatchResponseT { +func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceTransformerURL string) transformerBatchResponseT { bt.stats.sentStat.Count(len(events)) transformStart := time.Now() payload := misc.MakeJSONArray(events) - url := fmt.Sprintf(`%s/%s`, bt.sourceTransformerURL, strings.ToLower(sourceType)) - resp, err := bt.webhook.netClient.Post(url, "application/json; charset=utf-8", bytes.NewBuffer(payload)) + resp, err := bt.webhook.netClient.Post(sourceTransformerURL, "application/json; charset=utf-8", bytes.NewBuffer(payload)) bt.stats.transformTimerStat.Since(transformStart) if err != nil { - err := fmt.Errorf("JS HTTP connection error to source transformer: URL: %v Error: %+v", url, err) + err := fmt.Errorf("JS HTTP connection to source transformer (URL: %q): %w", sourceTransformerURL, err) return transformerBatchResponseT{batchError: err, statusCode: http.StatusServiceUnavailable} } diff --git a/gateway/webhook/webhookTransformer_test.go b/gateway/webhook/webhookTransformer_test.go new file mode 100644 index 0000000000..ba92bf2f09 --- /dev/null +++ b/gateway/webhook/webhookTransformer_test.go @@ -0,0 +1,65 @@ +package webhook + +import ( + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" + "github.com/rudderlabs/rudder-server/services/transformer" +) + +func TestV0Adapter(t *testing.T) { + v0Adapter := newSourceTransformAdapter(transformer.V0) + + t.Run("should return the right url", func(t *testing.T) { + testSrcType := "testSrcType" + url, err := v0Adapter.getTransformerURL(testSrcType) + require.Nil(t, err) + require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V0, testSrcType))) + }) + + t.Run("should return the body as is", func(t *testing.T) { + testBody := []byte("testBody") + retBody, err := v0Adapter.getTransformerEvent(nil, testBody) + require.Equal(t, testBody, retBody) + require.Nil(t, err) + }) +} + +func TestV1Adapter(t *testing.T) { + t.Run("should return the right url", func(t *testing.T) { + v1Adapter := newSourceTransformAdapter(transformer.V1) + testSrcType := "testSrcType" + url, err := v1Adapter.getTransformerURL(testSrcType) + require.Nil(t, err) + require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V1, testSrcType))) + }) + + t.Run("should return the body in v1 format", func(t *testing.T) { + testSrcId := "testSrcId" + testBody := []byte(`{"a": "testBody"}`) + + mockSrc := backendconfig.SourceT{ + ID: testSrcId, + Destinations: []backendconfig.DestinationT{{ID: "testDestId"}}, + } + + v1Adapter := newSourceTransformAdapter(transformer.V1) + + retBody, err := v1Adapter.getTransformerEvent(&gwtypes.AuthRequestContext{Source: mockSrc}, testBody) + require.Nil(t, err) + + v1TransformerEvent := V1TransformerEvent{ + Event: testBody, + Source: backendconfig.SourceT{ID: mockSrc.ID}, + } + expectedBody, err := json.Marshal(v1TransformerEvent) + require.Nil(t, err) + require.Equal(t, expectedBody, retBody) + }) +} diff --git a/gateway/webhook/webhook_test.go b/gateway/webhook/webhook_test.go index 65891d674a..36113f9e83 100644 --- a/gateway/webhook/webhook_test.go +++ b/gateway/webhook/webhook_test.go @@ -23,6 +23,7 @@ import ( gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" mockWebhook "github.com/rudderlabs/rudder-server/gateway/mocks" "github.com/rudderlabs/rudder-server/gateway/response" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -48,9 +49,73 @@ func initWebhook() { logger.Reset() misc.Init() config.Set("Gateway.webhook.maxTransformerProcess", 1) + config.Set("WriteTimeout", "1s") }) } +type mockSourceTransformAdapter struct { + url string +} + +func (v0 *mockSourceTransformAdapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) { + return body, nil +} + +func (v0 *mockSourceTransformAdapter) getTransformerURL(sourceType string) (string, error) { + return v0.url, nil +} + +func getMockSourceTransformAdapterFunc(url string) func(ctx context.Context) (sourceTransformAdapter, error) { + return func(ctx context.Context) (sourceTransformAdapter, error) { + mst := &mockSourceTransformAdapter{} + mst.url = url + return mst, nil + } +} + +func getMockTransformerService() transformer.FeaturesService { + return &mockTransformerService{} +} + +type mockTransformerService struct{} + +func (*mockTransformerService) SourceTransformerVersion() string { + return "random-version" +} + +func (*mockTransformerService) Wait() chan struct{} { + return make(chan struct{}) +} + +func (*mockTransformerService) RouterTransform(destType string) bool { + return false +} + +func TestWebhookBlockTillFeaturesAreFetched(t *testing.T) { + initWebhook() + ctrl := gomock.NewController(t) + mockGW := mockWebhook.NewMockGateway(ctrl) + webhookHandler := Setup(mockGW, getMockTransformerService(), stats.Default) + + mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) + mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1) + arctx := &gwtypes.AuthRequestContext{ + SourceDefName: sourceDefName, + WriteKey: sampleWriteKey, + } + webhookHandler.Register(sourceDefName) + req := httptest.NewRequest(http.MethodPost, "/v1/webhook", bytes.NewBufferString(sampleJson)) + w := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), gwtypes.CtxParamCallType, "webhook") + ctx = context.WithValue(ctx, gwtypes.CtxParamAuthRequestContext, arctx) + req = req.WithContext(ctx) + webhookHandler.RequestHandler(w, req) + + assert.Equal(t, http.StatusGatewayTimeout, w.Result().StatusCode) + assert.Contains(t, strings.TrimSpace(w.Body.String()), "Gateway timeout") + _ = webhookHandler.Shutdown() +} + func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) { initWebhook() ctrl := gomock.NewController(t) @@ -59,8 +124,8 @@ func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, sampleError, http.StatusBadRequest) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) @@ -104,8 +169,8 @@ func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t * respBody, _ := json.Marshal(responses) _, _ = w.Write(respBody) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) @@ -147,8 +212,8 @@ func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) { respBody, _ := json.Marshal(responses) _, _ = w.Write(respBody) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1) @@ -190,8 +255,8 @@ func TestWebhookRequestHandlerWithOutputToSource(t *testing.T) { respBody, _ := json.Marshal(responses) _, _ = w.Write(respBody) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics("").Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1) @@ -232,8 +297,8 @@ func TestWebhookRequestHandlerWithOutputToGateway(t *testing.T) { respBody, _ := json.Marshal(responses) _, _ = w.Write(respBody) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics("").Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1) @@ -279,8 +344,8 @@ func TestWebhookRequestHandlerWithOutputToGatewayAndSource(t *testing.T) { respBody, _ := json.Marshal(responses) _, _ = w.Write(respBody) })) - webhookHandler := Setup(mockGW, stats.Default, func(bt *batchWebhookTransformerT) { - bt.sourceTransformerURL = transformerServer.URL + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), stats.Default, func(bt *batchWebhookTransformerT) { + bt.sourceTransformAdapter = getMockSourceTransformAdapterFunc(transformerServer.URL) }) mockGW.EXPECT().TrackRequestMetrics("").Times(1) mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1) @@ -310,7 +375,7 @@ func TestRecordWebhookErrors(t *testing.T) { ctrl := gomock.NewController(t) mockGW := mockWebhook.NewMockGateway(ctrl) statsStore := memstats.New() - webhookHandler := Setup(mockGW, statsStore) + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), statsStore) reqs := []*webhookT{ {authContext: &gwtypes.AuthRequestContext{WriteKey: "w1"}}, {authContext: &gwtypes.AuthRequestContext{WriteKey: "w2"}}, diff --git a/processor/manager.go b/processor/manager.go index e8e7ead40a..bf12c06238 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -16,32 +16,34 @@ import ( transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rsources" + transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/types" ) type LifecycleManager struct { - Handle *Handle - mainCtx context.Context - currentCancel context.CancelFunc - waitGroup interface{ Wait() } - gatewayDB *jobsdb.Handle - routerDB *jobsdb.Handle - batchRouterDB *jobsdb.Handle - readErrDB *jobsdb.Handle - writeErrDB *jobsdb.Handle - esDB *jobsdb.Handle - arcDB *jobsdb.Handle - clearDB *bool - ReportingI types.Reporting // need not initialize again - BackendConfig backendconfig.BackendConfig - Transformer transformer.Transformer - transientSources transientsource.Service - fileuploader fileuploader.Provider - rsourcesService rsources.JobService - destDebugger destinationdebugger.DestinationDebugger - transDebugger transformationdebugger.TransformationDebugger - enrichers []enricher.PipelineEnricher + Handle *Handle + mainCtx context.Context + currentCancel context.CancelFunc + waitGroup interface{ Wait() } + gatewayDB *jobsdb.Handle + routerDB *jobsdb.Handle + batchRouterDB *jobsdb.Handle + readErrDB *jobsdb.Handle + writeErrDB *jobsdb.Handle + esDB *jobsdb.Handle + arcDB *jobsdb.Handle + clearDB *bool + ReportingI types.Reporting // need not initialize again + BackendConfig backendconfig.BackendConfig + Transformer transformer.Transformer + transientSources transientsource.Service + fileuploader fileuploader.Provider + rsourcesService rsources.JobService + transformerFeaturesService transformerFeaturesService.FeaturesService + destDebugger destinationdebugger.DestinationDebugger + transDebugger transformationdebugger.TransformationDebugger + enrichers []enricher.PipelineEnricher } // Start starts a processor, this is not a blocking call. @@ -65,6 +67,7 @@ func (proc *LifecycleManager) Start() error { proc.transientSources, proc.fileuploader, proc.rsourcesService, + proc.transformerFeaturesService, proc.destDebugger, proc.transDebugger, proc.enrichers, @@ -96,12 +99,6 @@ func (proc *LifecycleManager) Stop() { proc.Handle.Shutdown() } -func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) { - return func(l *LifecycleManager) { - l.Handle.config.featuresRetryMaxAttempts = maxAttempts - } -} - // New creates a new Processor instance func New( ctx context.Context, @@ -111,6 +108,7 @@ func New( transientSources transientsource.Service, fileuploader fileuploader.Provider, rsourcesService rsources.JobService, + transformerFeaturesService transformerFeaturesService.FeaturesService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, enrichers []enricher.PipelineEnricher, @@ -125,23 +123,24 @@ func New( stats.Default, ), ), - mainCtx: ctx, - gatewayDB: gwDb, - routerDB: rtDb, - batchRouterDB: brtDb, - readErrDB: errDbForRead, - writeErrDB: errDBForWrite, - esDB: esDB, - arcDB: arcDB, - clearDB: clearDb, - BackendConfig: backendconfig.DefaultBackendConfig, - ReportingI: reporting, - transientSources: transientSources, - fileuploader: fileuploader, - rsourcesService: rsourcesService, - destDebugger: destDebugger, - transDebugger: transDebugger, - enrichers: enrichers, + mainCtx: ctx, + gatewayDB: gwDb, + routerDB: rtDb, + batchRouterDB: brtDb, + readErrDB: errDbForRead, + writeErrDB: errDBForWrite, + esDB: esDB, + arcDB: arcDB, + clearDB: clearDb, + BackendConfig: backendconfig.DefaultBackendConfig, + ReportingI: reporting, + transientSources: transientSources, + fileuploader: fileuploader, + rsourcesService: rsourcesService, + transformerFeaturesService: transformerFeaturesService, + destDebugger: destDebugger, + transDebugger: transDebugger, + enrichers: enrichers, } for _, opt := range opts { opt(proc) diff --git a/processor/manager_test.go b/processor/manager_test.go index 09057f6bd1..b92b2bd583 100644 --- a/processor/manager_test.go +++ b/processor/manager_test.go @@ -3,7 +3,6 @@ package processor import ( "context" "database/sql" - "encoding/json" "flag" "fmt" "log" @@ -32,6 +31,7 @@ import ( transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rsources" + "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/pubsub" ) @@ -218,12 +218,12 @@ func TestProcessorManager(t *testing.T) { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), mockRsourcesService, + transformer.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, func(m *LifecycleManager) { m.Handle.config.enablePipelining = false - m.Handle.config.featuresRetryMaxAttempts = 0 }) t.Run("jobs are already there in GW DB before processor starts", func(t *testing.T) { @@ -244,7 +244,6 @@ func TestProcessorManager(t *testing.T) { }, ) mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) - processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures) mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1) processor.BackendConfig = mockBackendConfig processor.Handle.transformer = mockTransformer @@ -284,7 +283,6 @@ func TestProcessorManager(t *testing.T) { ) mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) - processor.Handle.transformerFeatures = json.RawMessage(defaultTransformerFeatures) mockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), rsources.Stats{Out: 10}).Times(1) require.NoError(t, processor.Start()) diff --git a/processor/processor.go b/processor/processor.go index d36e6b2417..e2076b5919 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1,13 +1,10 @@ package processor import ( - "bytes" "context" "encoding/json" "errors" "fmt" - "io" - "net/http" "runtime/trace" "slices" "strconv" @@ -45,8 +42,8 @@ import ( "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/services/rsources" + transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" - "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck "github.com/rudderlabs/rudder-server/utils/types" @@ -75,36 +72,35 @@ type Handle struct { transformer transformer.Transformer lastJobID int64 - gatewayDB jobsdb.JobsDB - routerDB jobsdb.JobsDB - batchRouterDB jobsdb.JobsDB - readErrorDB jobsdb.JobsDB - writeErrorDB jobsdb.JobsDB - eventSchemaDB jobsdb.JobsDB - archivalDB jobsdb.JobsDB - - logger logger.Logger - eventSchemaHandler types.EventSchemasI - enrichers []enricher.PipelineEnricher - dedup dedup.Dedup - reporting types.Reporting - reportingEnabled bool - backgroundWait func() error - backgroundCancel context.CancelFunc - transformerFeatures json.RawMessage - statsFactory stats.Stats - stats processorStats - payloadLimit misc.ValueLoader[int64] - jobsDBCommandTimeout misc.ValueLoader[time.Duration] - jobdDBQueryRequestTimeout misc.ValueLoader[time.Duration] - jobdDBMaxRetries misc.ValueLoader[int] - transientSources transientsource.Service - fileuploader fileuploader.Provider - rsourcesService rsources.JobService - destDebugger destinationdebugger.DestinationDebugger - transDebugger transformationdebugger.TransformationDebugger - isolationStrategy isolation.Strategy - limiter struct { + gatewayDB jobsdb.JobsDB + routerDB jobsdb.JobsDB + batchRouterDB jobsdb.JobsDB + readErrorDB jobsdb.JobsDB + writeErrorDB jobsdb.JobsDB + eventSchemaDB jobsdb.JobsDB + archivalDB jobsdb.JobsDB + logger logger.Logger + eventSchemaHandler types.EventSchemasI + enrichers []enricher.PipelineEnricher + dedup dedup.Dedup + reporting types.Reporting + reportingEnabled bool + backgroundWait func() error + backgroundCancel context.CancelFunc + statsFactory stats.Stats + stats processorStats + payloadLimit misc.ValueLoader[int64] + jobsDBCommandTimeout misc.ValueLoader[time.Duration] + jobdDBQueryRequestTimeout misc.ValueLoader[time.Duration] + jobdDBMaxRetries misc.ValueLoader[int] + transientSources transientsource.Service + fileuploader fileuploader.Provider + rsourcesService rsources.JobService + transformerFeaturesService transformerFeaturesService.FeaturesService + destDebugger destinationdebugger.DestinationDebugger + transDebugger transformationdebugger.TransformationDebugger + isolationStrategy isolation.Strategy + limiter struct { read kitsync.Limiter preprocess kitsync.Limiter transform kitsync.Limiter @@ -113,7 +109,6 @@ type Handle struct { config struct { isolationMode isolation.Mode mainLoopTimeout time.Duration - featuresRetryMaxAttempts int enablePipelining bool pipelineBufferedItems int subJobSize int @@ -194,13 +189,6 @@ type processorStats struct { DBWriteThroughput stats.Measurement } -var defaultTransformerFeatures = `{ - "routerTransform": { - "MARKETO": true, - "HS": true - } - }` - type DestStatT struct { numEvents stats.Measurement numOutputSuccessEvents stats.Measurement @@ -365,6 +353,7 @@ func (proc *Handle) Setup( transientSources transientsource.Service, fileuploader fileuploader.Provider, rsourcesService rsources.JobService, + transformerFeaturesService transformerFeaturesService.FeaturesService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, enrichers []enricher.PipelineEnricher, @@ -393,6 +382,8 @@ func (proc *Handle) Setup( proc.rsourcesService = rsourcesService proc.enrichers = enrichers + proc.transformerFeaturesService = transformerFeaturesService + if proc.adaptiveLimit == nil { proc.adaptiveLimit = func(limit int64) int64 { return limit } } @@ -467,17 +458,12 @@ func (proc *Handle) Setup( proc.backgroundWait = g.Wait proc.backgroundCancel = cancel - proc.config.asyncInit = misc.NewAsyncInit(2) + proc.config.asyncInit = misc.NewAsyncInit(1) g.Go(misc.WithBugsnag(func() error { proc.backendConfigSubscriber(ctx) return nil })) - g.Go(misc.WithBugsnag(func() error { - proc.syncTransformerFeatureJson(ctx) - return nil - })) - // periodically publish a zero counter for ensuring that stuck processing pipeline alert // can always detect a stuck processor g.Go(misc.WithBugsnag(func() error { @@ -550,6 +536,16 @@ func (proc *Handle) Start(ctx context.Context) error { } proc.logger.Info("Async init group done") + // waiting for init group + proc.logger.Info("Waiting for transformer features") + select { + case <-ctx.Done(): + return nil + case <-proc.transformerFeaturesService.Wait(): + // proceed + } + proc.logger.Info("Transformer features received") + h := &workerHandleAdapter{proc} pool := workerpool.New(ctx, func(partition string) workerpool.Worker { return newProcessorWorker(partition, h) }, proc.logger) defer pool.Shutdown() @@ -603,7 +599,6 @@ func (proc *Handle) Shutdown() { func (proc *Handle) loadConfig() { proc.config.mainLoopTimeout = 200 * time.Millisecond - proc.config.featuresRetryMaxAttempts = 10 defaultSubJobSize := 2000 defaultMaxEventsToProcess := 10000 @@ -655,79 +650,6 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName") } -// syncTransformerFeatureJson polls the transformer feature json endpoint, -// -// updates the transformer feature map. -// -// It will set isUnLocked to true if it successfully fetches the transformer feature json at least once. -func (proc *Handle) syncTransformerFeatureJson(ctx context.Context) { - var initDone bool - proc.logger.Infof("Fetching transformer features from %s", proc.config.transformerURL) - for { - for i := 0; i < proc.config.featuresRetryMaxAttempts; i++ { - - if ctx.Err() != nil { - return - } - - retry := proc.makeFeaturesFetchCall() - if retry { - proc.logger.Infof("Fetched transformer features from %s (retry: %v)", proc.config.transformerURL, retry) - } - if retry { - select { - case <-ctx.Done(): - return - case <-time.After(200 * time.Millisecond): - continue - } - } - break - } - - if proc.transformerFeatures != nil && !initDone { - initDone = true - proc.config.asyncInit.Done() - } - - select { - case <-ctx.Done(): - return - case <-time.After(proc.config.pollInterval): - } - } -} - -func (proc *Handle) makeFeaturesFetchCall() bool { - url := proc.config.transformerURL + "/features" - req, err := http.NewRequest("GET", url, bytes.NewReader([]byte{})) - if err != nil { - proc.logger.Error("error creating request - %s", err) - return true - } - tr := &http.Transport{} - client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second)} - res, err := client.Do(req) - if err != nil { - proc.logger.Error("error sending request - %s", err) - return true - } - - defer func() { httputil.CloseResponse(res) }() - body, err := io.ReadAll(res.Body) - if err != nil { - return true - } - - if res.StatusCode == 200 { - proc.transformerFeatures = body - } else if res.StatusCode == 404 { - proc.transformerFeatures = json.RawMessage(defaultTransformerFeatures) - } - - return false -} - func (proc *Handle) backendConfigSubscriber(ctx context.Context) { var initDone bool ch := proc.backendConfig.Subscribe(ctx, backendconfig.TopicProcessConfig) @@ -2309,7 +2231,7 @@ func (proc *Handle) transformSrcDest( if transformAtOverrideFound { transformAt = config.GetString("Processor."+destination.DestinationDefinition.Name+".transformAt", "processor") } - transformAtFromFeaturesFile := gjson.Get(string(proc.transformerFeatures), fmt.Sprintf("routerTransform.%s", destination.DestinationDefinition.Name)).String() + transformAtFromFeaturesFile := proc.transformerFeaturesService.RouterTransform(destination.DestinationDefinition.Name) // Filtering events based on the supported message types - START s := time.Now() @@ -2383,7 +2305,7 @@ func (proc *Handle) transformSrcDest( // a. transformAt is processor // OR // b. transformAt is router and transformer doesn't support router transform - if transformAt == "processor" || (transformAt == "router" && transformAtFromFeaturesFile == "") { + if transformAt == "processor" || (transformAt == "router" && !transformAtFromFeaturesFile) { trace.WithRegion(ctx, "Dest Transform", func() { trace.Logf(ctx, "Dest Transform", "input size %d", len(eventsToTransform)) proc.logger.Debug("Dest Transform input size", len(eventsToTransform)) diff --git a/processor/processor_test.go b/processor/processor_test.go index d4ee4fc005..27c72b48c7 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -39,6 +39,7 @@ import ( transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rsources" + transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/pubsub" @@ -1160,6 +1161,7 @@ var _ = Describe("Processor", Ordered, func() { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -1189,6 +1191,7 @@ var _ = Describe("Processor", Ordered, func() { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -1223,6 +1226,7 @@ var _ = Describe("Processor", Ordered, func() { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -2272,7 +2276,6 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.config.featuresRetryMaxAttempts = 0 processor.Setup( c.mockBackendConfig, c.mockGatewayJobsDB, @@ -2286,6 +2289,7 @@ var _ = Describe("Processor", Ordered, func() { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + getMockTransformerService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -2312,7 +2316,7 @@ var _ = Describe("Processor", Ordered, func() { Consistently(func() bool { select { - case <-processor.config.asyncInit.Wait(): + case <-processor.transformerFeaturesService.Wait(): return true default: return false @@ -2330,7 +2334,6 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.config.featuresRetryMaxAttempts = 0 processor.Setup( c.mockBackendConfig, c.mockGatewayJobsDB, @@ -2344,6 +2347,7 @@ var _ = Describe("Processor", Ordered, func() { transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -4010,6 +4014,7 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) transientsource.NewStaticService([]string{SourceIDTransient}), fileuploader.NewDefaultProvider(), c.MockRsourcesService, + transformerFeaturesService.NewNoOpService(), destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, @@ -4366,3 +4371,21 @@ func TestStoreMessageMerge(t *testing.T) { require.Len(t, merged.dedupKeys, 2, "dedup keys should have 2 elements") require.Equal(t, merged.totalEvents, 2, "total events should be 2") } + +func getMockTransformerService() transformerFeaturesService.FeaturesService { + return &mockTransformerService{} +} + +type mockTransformerService struct{} + +func (*mockTransformerService) SourceTransformerVersion() string { + return "random-version" +} + +func (*mockTransformerService) Wait() chan struct{} { + return make(chan struct{}) +} + +func (*mockTransformerService) RouterTransform(destType string) bool { + return false +} diff --git a/services/transformer/features.go b/services/transformer/features.go new file mode 100644 index 0000000000..9149e266ce --- /dev/null +++ b/services/transformer/features.go @@ -0,0 +1,67 @@ +package transformer + +import ( + "context" + "encoding/json" + "time" + + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-server/rruntime" +) + +const ( + V0 = "v0" + V1 = "v1" +) + +type FeaturesServiceConfig struct { + PollInterval time.Duration + TransformerURL string + FeaturesRetryMaxAttempts int +} + +type FeaturesService interface { + SourceTransformerVersion() string + RouterTransform(destType string) bool + Wait() chan struct{} +} + +var defaultTransformerFeatures = `{ + "routerTransform": { + "MARKETO": true, + "HS": true + } + }` + +func NewFeaturesService(ctx context.Context, config FeaturesServiceConfig) FeaturesService { + handler := &featuresService{ + features: json.RawMessage(defaultTransformerFeatures), + logger: logger.NewLogger().Child("transformer-features"), + waitChan: make(chan struct{}), + config: config, + } + + rruntime.Go(func() { handler.syncTransformerFeatureJson(ctx) }) + + return handler +} + +func NewNoOpService() FeaturesService { + return &noopService{} +} + +type noopService struct{} + +func (*noopService) SourceTransformerVersion() string { + return V0 +} + +func (*noopService) Wait() chan struct{} { + dummyChan := make(chan struct{}) + close(dummyChan) + return dummyChan +} + +func (*noopService) RouterTransform(_ string) bool { + return false +} diff --git a/services/transformer/features_impl.go b/services/transformer/features_impl.go new file mode 100644 index 0000000000..957ddb48d6 --- /dev/null +++ b/services/transformer/features_impl.go @@ -0,0 +1,107 @@ +package transformer + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "time" + + "github.com/tidwall/gjson" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-server/utils/httputil" +) + +type featuresService struct { + logger logger.Logger + waitChan chan struct{} + config FeaturesServiceConfig + features json.RawMessage +} + +func (t *featuresService) SourceTransformerVersion() string { + if gjson.GetBytes(t.features, "supportSourceTransformV1").Bool() { + return V1 + } + + return V0 +} + +func (t *featuresService) RouterTransform(destType string) bool { + return gjson.GetBytes(t.features, "routerTransform."+destType).Bool() +} + +func (t *featuresService) Wait() chan struct{} { + return t.waitChan +} + +func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) { + var initDone bool + t.logger.Infof("Fetching transformer features from %s", t.config.TransformerURL) + for { + var downloaded bool + for i := 0; i < t.config.FeaturesRetryMaxAttempts; i++ { + + if ctx.Err() != nil { + return + } + + retry := t.makeFeaturesFetchCall() + if retry { + t.logger.Infof("Fetched transformer features from %s (retry: %v)", t.config.TransformerURL, retry) + select { + case <-ctx.Done(): + return + case <-time.After(200 * time.Millisecond): + continue + } + } + downloaded = true + break + } + + if downloaded && !initDone { + initDone = true + close(t.waitChan) + } + + select { + case <-ctx.Done(): + return + case <-time.After(t.config.PollInterval): + } + } +} + +func (t *featuresService) makeFeaturesFetchCall() bool { + url := t.config.TransformerURL + "/features" + req, err := http.NewRequest("GET", url, bytes.NewReader([]byte{})) + if err != nil { + t.logger.Error("error creating request - ", err) + return true + } + tr := &http.Transport{} + client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second)} + res, err := client.Do(req) + if err != nil { + t.logger.Error("error sending request - ", err) + return true + } + + defer func() { httputil.CloseResponse(res) }() + body, err := io.ReadAll(res.Body) + if err != nil { + return true + } + + if res.StatusCode == 200 { + t.features = body + } else if res.StatusCode == 404 { + t.features = json.RawMessage(defaultTransformerFeatures) + } + + return false +} diff --git a/services/transformer/features_impl_test.go b/services/transformer/features_impl_test.go new file mode 100644 index 0000000000..398eab35c8 --- /dev/null +++ b/services/transformer/features_impl_test.go @@ -0,0 +1,117 @@ +package transformer + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/rudderlabs/rudder-go-kit/logger" +) + +var _ = Describe("Transformer features", func() { + Context("Transformer features service", func() { + It("handler should wait till features are not fetched", func() { + handler := &featuresService{ + logger: logger.NewLogger(), + waitChan: make(chan struct{}), + config: FeaturesServiceConfig{ + PollInterval: time.Duration(1), + FeaturesRetryMaxAttempts: 1, + }, + } + + Consistently(func() bool { + select { + case <-handler.Wait(): + return true + default: + return false + } + }, 2*time.Second, 10*time.Millisecond).Should(BeFalse()) + }) + + It("before features are fetched, SourceTransformerVersion should return v0", func() { + handler := &featuresService{ + features: json.RawMessage(defaultTransformerFeatures), + logger: logger.NewLogger(), + waitChan: make(chan struct{}), + config: FeaturesServiceConfig{ + PollInterval: time.Duration(1), + FeaturesRetryMaxAttempts: 1, + }, + } + + Expect(handler.SourceTransformerVersion()).To(Equal(V0)) + }) + + It("before features are fetched, defaultTransformerFeatures must be served", func() { + handler := &featuresService{ + features: json.RawMessage(defaultTransformerFeatures), + logger: logger.NewLogger(), + waitChan: make(chan struct{}), + config: FeaturesServiceConfig{ + PollInterval: time.Duration(1), + FeaturesRetryMaxAttempts: 1, + }, + } + + Expect(handler.RouterTransform("MARKETO")).To(BeTrue()) + Expect(handler.RouterTransform("HS")).To(BeTrue()) + Expect(handler.RouterTransform("ACTIVE_CAMPAIGN")).To(BeFalse()) + Expect(handler.RouterTransform("ALGOLIA")).To(BeFalse()) + }) + + It("if transformer returns 404, features should be same as defaultTransformerFeatures", func() { + transformerServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "not found error", http.StatusNotFound) + })) + + handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{ + PollInterval: time.Duration(1), + TransformerURL: transformerServer.URL, + FeaturesRetryMaxAttempts: 1, + }) + + <-handler.Wait() + + Expect(handler.RouterTransform("MARKETO")).To(BeTrue()) + Expect(handler.RouterTransform("HS")).To(BeTrue()) + Expect(handler.RouterTransform("ACTIVE_CAMPAIGN")).To(BeFalse()) + Expect(handler.RouterTransform("ALGOLIA")).To(BeFalse()) + }) + + It("Get should return features fetched from transformer", func() { + mockTransformerResp := `{ + "routerTransform": { + "a": true, + "b": true + }, + "supportSourceTransformV1": true + }` + transformerServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(mockTransformerResp)) + })) + + handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{ + PollInterval: time.Duration(1), + TransformerURL: transformerServer.URL, + FeaturesRetryMaxAttempts: 1, + }) + + <-handler.Wait() + + Expect(handler.RouterTransform("MARKETO")).To(BeFalse()) + Expect(handler.RouterTransform("HS")).To(BeFalse()) + Expect(handler.RouterTransform("a")).To(BeTrue()) + Expect(handler.RouterTransform("b")).To(BeTrue()) + Expect(handler.SourceTransformerVersion()).To(Equal(V1)) + }) + }) +}) diff --git a/services/transformer/features_suite_test.go b/services/transformer/features_suite_test.go new file mode 100644 index 0000000000..baa91f7965 --- /dev/null +++ b/services/transformer/features_suite_test.go @@ -0,0 +1,13 @@ +package transformer + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestTransformerFeatures(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Transformer features Suite") +}