-
Notifications
You must be signed in to change notification settings - Fork 298
/
gatewayAppHandler.go
137 lines (119 loc) · 3.71 KB
/
gatewayAppHandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package apphandlers
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway"
gwThrottler "github.com/rudderlabs/rudder-server/gateway/throttler"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
// gatewayApp is the type for Gateway type implementation
type gatewayApp struct {
setupDone bool
app app.App
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
gatewayDSLimit int
}
}
func (a *gatewayApp) loadConfiguration() {
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
}
func (a *gatewayApp) Setup(options *app.Options) error {
a.loadConfiguration()
if err := db.HandleNullRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.GATEWAY); err != nil {
return err
}
if err := rudderCoreDBValidator(); err != nil {
return err
}
if err := rudderCoreWorkSpaceTableSetup(); err != nil {
return err
}
a.setupDone = true
return nil
}
func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
if !a.setupDone {
return fmt.Errorf("gateway cannot start, database is not setup")
}
a.log.Info("Gateway starting")
deploymentType, err := deployment.GetFromEnv()
if err != nil {
return fmt.Errorf("failed to get deployment type: %v", err)
}
a.log.Infof("Configured deployment type: %q", deploymentType)
a.log.Info("Clearing DB ", options.ClearDB)
sourceHandle, err := sourcedebugger.NewHandle(backendconfig.DefaultBackendConfig)
if err != nil {
return err
}
defer sourceHandle.Stop()
fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gatewayDB.Close()
if err := gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gatewayDB: %w", err)
}
defer gatewayDB.Stop()
g, ctx := errgroup.WithContext(ctx)
modeProvider, err := resolveModeProvider(a.log, deploymentType)
if err != nil {
return err
}
dm := cluster.Dynamic{
Provider: modeProvider,
GatewayComponent: true,
}
g.Go(func() error {
return dm.Run(ctx)
})
var gw gateway.HandleT
rateLimiter, err := gwThrottler.New(stats.Default)
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
}
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
}
err = gw.Setup(
ctx,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
rateLimiter, a.versionHandler, rsourcesService, sourceHandle,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
a.log.Warnf("Gateway shutdown error: %v", err)
}
}()
g.Go(func() error {
return gw.StartAdminHandler(ctx)
})
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
return g.Wait()
}