Skip to content

Commit

Permalink
feat: new transformer service to fetch and serve transformer features (
Browse files Browse the repository at this point in the history
  • Loading branch information
anantjain45823 committed Nov 10, 2023
1 parent 47b2f92 commit f95fa51
Show file tree
Hide file tree
Showing 22 changed files with 741 additions and 226 deletions.
10 changes: 9 additions & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/payload"
Expand Down Expand Up @@ -130,6 +131,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return err
}

transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})

// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
Expand Down Expand Up @@ -257,6 +264,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
transientSources,
fileUploaderProvider,
rsourcesService,
transformerFeaturesService,
destinationHandle,
transformationhandle,
enrichers,
Expand Down Expand Up @@ -319,7 +327,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
ctx,
config, logger.NewLogger().Child("gateway"), stats.Default,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDBForWrite,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
)
if err != nil {
return fmt.Errorf("could not setup gateway: %w", err)
Expand Down
9 changes: 8 additions & 1 deletion app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -122,11 +124,16 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return err
}
transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})
err = gw.Setup(
ctx,
config, logger.NewLogger().Child("gateway"), stats.Default,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
Expand Down
8 changes: 8 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/payload"
Expand Down Expand Up @@ -137,6 +138,12 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return err
}

transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})

gwDBForProcessor := jobsdb.NewForRead(
"gw",
jobsdb.WithClearDB(options.ClearDB),
Expand Down Expand Up @@ -247,6 +254,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
transientSources,
fileUploaderProvider,
rsourcesService,
transformerFeaturesService,
destinationHandle,
transformationhandle,
enrichers,
Expand Down
3 changes: 2 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
Expand Down Expand Up @@ -208,10 +209,10 @@ func TestDynamicClusterManager(t *testing.T) {
transientsource.NewEmptyService(),
fileuploader.NewDefaultProvider(),
rsources.NewNoOpService(),
transformer.NewNoOpService(),
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
[]enricher.PipelineEnricher{},
processor.WithFeaturesRetryMaxAttempts(0),
)
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
Expand Down
21 changes: 11 additions & 10 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
mocksTypes "github.com/rudderlabs/rudder-server/mocks/utils/types"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
testutils "github.com/rudderlabs/rudder-server/utils/tests"
Expand Down Expand Up @@ -266,7 +267,7 @@ var _ = Describe("Gateway Enterprise", func() {
Context("Suppress users", func() {
BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
statsStore = memstats.New()
gateway.stats = statsStore
Expand Down Expand Up @@ -402,7 +403,7 @@ var _ = Describe("Gateway", func() {
Context("Initialization", func() {
It("should wait for backend config", func() {
gateway := &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
err = gateway.Shutdown()
Expand Down Expand Up @@ -435,7 +436,7 @@ var _ = Describe("Gateway", func() {
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

gateway = &Handle{}
err = gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err = gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
gateway.irh = mockRequestHandler{}
Expand Down Expand Up @@ -526,7 +527,7 @@ var _ = Describe("Gateway", func() {

BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
statsStore = memstats.New()
Expand Down Expand Up @@ -854,7 +855,7 @@ var _ = Describe("Gateway", func() {
BeforeEach(func() {
gateway = &Handle{}

err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)

Expand Down Expand Up @@ -959,7 +960,7 @@ var _ = Describe("Gateway", func() {
BeforeEach(func() {
gateway = &Handle{}
conf.Set("Gateway.enableRateLimit", true)
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, c.mockRateLimiter, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)

Expand Down Expand Up @@ -1051,7 +1052,7 @@ var _ = Describe("Gateway", func() {

BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)

Expand Down Expand Up @@ -1334,7 +1335,7 @@ var _ = Describe("Gateway", func() {

BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
})
Expand All @@ -1360,7 +1361,7 @@ var _ = Describe("Gateway", func() {
)
BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
})
Expand Down Expand Up @@ -1541,7 +1542,7 @@ var _ = Describe("Gateway", func() {
var gateway *Handle
BeforeEach(func() {
gateway = &Handle{}
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
err := gateway.Setup(context.Background(), conf, logger.NOP, stats.Default, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
})
Expand Down
1 change: 1 addition & 0 deletions gateway/handle_http_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func sourceToRequestContext(s backendconfig.SourceT) *gwtypes.AuthRequestContext
SourceCategory: s.SourceDefinition.Category,
SourceDefName: s.SourceDefinition.Name,
ReplaySource: s.IsReplaySource(),
Source: s,
}
if arctx.SourceCategory == "" {
arctx.SourceCategory = eventStreamSourceCategory
Expand Down
5 changes: 3 additions & 2 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
rsources_http "github.com/rudderlabs/rudder-server/services/rsources/http"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand All @@ -50,7 +51,7 @@ func (gw *Handle) Setup(
config *config.Config, logger logger.Logger, stat stats.Stats,
application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errDB jobsdb.JobsDB,
rateLimiter throttler.Throttler, versionHandler func(w http.ResponseWriter, r *http.Request),
rsourcesService rsources.JobService, sourcehandle sourcedebugger.SourceDebugger,
rsourcesService rsources.JobService, transformerFeaturesService transformer.FeaturesService, sourcehandle sourcedebugger.SourceDebugger,
) error {
gw.config = config
gw.logger = logger
Expand Down Expand Up @@ -118,7 +119,7 @@ func (gw *Handle) Setup(
gw.batchUserWorkerBatchRequestQ = make(chan *batchUserWorkerBatchRequestT, gw.conf.maxDBWriterProcess)
gw.irh = &ImportRequestHandler{Handle: gw}
gw.rrh = &RegularRequestHandler{Handle: gw}
gw.webhook = webhook.Setup(gw, gw.stats)
gw.webhook = webhook.Setup(gw, transformerFeaturesService, gw.stats)
whURL, err := url.ParseRequestURI(misc.GetWarehouseURL())
if err != nil {
return fmt.Errorf("invalid warehouse URL %s: %w", whURL, err)
Expand Down
6 changes: 5 additions & 1 deletion gateway/internal/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package types

import "github.com/rudderlabs/rudder-server/utils/misc"
import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
)

type ContextKey string

Expand All @@ -23,6 +26,7 @@ type AuthRequestContext struct {
ReplaySource bool
SourceJobRunID string
SourceTaskRunID string
Source backendconfig.SourceT
}

func (arctx *AuthRequestContext) SourceTag() string {
Expand Down
4 changes: 4 additions & 0 deletions gateway/response/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (
ContextDeadlineExceeded = "context deadline exceeded"
// GatewayTimeout - Gateway timeout
GatewayTimeout = "Gateway timeout"
// ServiceUnavailable - Service unavailable
ServiceUnavailable = "Service unavailable"
// NoSourceIdInHeader - Failed to read source id from header
NoSourceIdInHeader = "Failed to read source id from header"
// InvalidSourceID - Invalid source id
Expand Down Expand Up @@ -100,6 +102,8 @@ var statusMap = map[string]status{
ErrorInParseMultiform: {message: ErrorInParseMultiform, code: http.StatusBadRequest},
NotRudderEvent: {message: NotRudderEvent, code: http.StatusBadRequest},
ContextDeadlineExceeded: {message: GatewayTimeout, code: http.StatusGatewayTimeout},
GatewayTimeout: {message: GatewayTimeout, code: http.StatusGatewayTimeout},
ServiceUnavailable: {message: ServiceUnavailable, code: http.StatusServiceUnavailable},
}

// status holds the gateway response status message and code
Expand Down
22 changes: 15 additions & 7 deletions gateway/webhook/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"time"

"github.com/hashicorp/go-retryablehttp"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/webhook/model"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand All @@ -42,10 +45,8 @@ func newWebhookStats() *webhookStatsT {
return &wStats
}

func Setup(gwHandle Gateway, stat stats.Stats, opts ...batchTransformerOption) *HandleT {
func Setup(gwHandle Gateway, transformerFeaturesService transformer.FeaturesService, stat stats.Stats, opts ...batchTransformerOption) *HandleT {
webhook := &HandleT{gwHandle: gwHandle, stats: stat, logger: logger.NewLogger().Child("gateway").Child("webhook")}

sourceTransformerURL := strings.TrimSuffix(config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), "/") + "/v0/sources"
// Number of incoming webhooks that are batched before calling source transformer
webhook.config.maxWebhookBatchSize = config.GetReloadableIntVar(32, 1, "Gateway.webhook.maxBatchSize")
// Timeout after which batch is formed anyway with whatever webhooks are available
Expand Down Expand Up @@ -75,9 +76,16 @@ func Setup(gwHandle Gateway, stat stats.Stats, opts ...batchTransformerOption) *
for i := 0; i < maxTransformerProcess; i++ {
g.Go(misc.WithBugsnag(func() error {
bt := batchWebhookTransformerT{
webhook: webhook,
stats: newWebhookStats(),
sourceTransformerURL: sourceTransformerURL,
webhook: webhook,
stats: newWebhookStats(),
sourceTransformAdapter: func(ctx context.Context) (sourceTransformAdapter, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-transformerFeaturesService.Wait():
return newSourceTransformAdapter(transformerFeaturesService.SourceTransformerVersion()), nil
}
},
}
for _, opt := range opts {
opt(&bt)
Expand Down
Loading

0 comments on commit f95fa51

Please sign in to comment.