Skip to content

Commit

Permalink
refactor: gateway (#3708)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Aug 8, 2023
1 parent 7719719 commit 3cc17f3
Show file tree
Hide file tree
Showing 13 changed files with 1,850 additions and 1,802 deletions.
10 changes: 6 additions & 4 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (a *embeddedApp) Setup(options *app.Options) error {
}

func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
if !a.setupDone {
return fmt.Errorf("embedded rudder core cannot start, database is not setup")
}
Expand Down Expand Up @@ -202,14 +203,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
client, err := pulsar.NewClient(config.Default)
client, err := pulsar.NewClient(config)
if err != nil {
return err
}
defer client.Close()
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default)
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
} else {
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default)
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
}

modeProvider, err := resolveModeProvider(a.log, deploymentType)
Expand Down Expand Up @@ -280,9 +281,10 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return fmt.Errorf("failed to create gw rate limiter: %w", err)
}
gw := gateway.HandleT{}
gw := gateway.Handle{}
err = gw.Setup(
ctx,
config, logger.NewLogger().Child("gateway"), stats.Default,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDBForWrite,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
)
Expand Down
4 changes: 3 additions & 1 deletion app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (a *gatewayApp) Setup(options *app.Options) error {
}

func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
if !a.setupDone {
return fmt.Errorf("gateway cannot start, database is not setup")
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
return dm.Run(ctx)
})

var gw gateway.HandleT
var gw gateway.Handle
rateLimiter, err := gwThrottler.New(stats.Default)
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
Expand All @@ -127,6 +128,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
err = gw.Setup(
ctx,
config, logger.NewLogger().Child("gateway"), stats.Default,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
)
Expand Down
80 changes: 0 additions & 80 deletions gateway/configuration.go

This file was deleted.

Loading

0 comments on commit 3cc17f3

Please sign in to comment.