forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 2
/
gateway-main.go
345 lines (282 loc) · 9.68 KB
/
gateway-main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
/*
* MinIO Cloud Storage, (C) 2017-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"net"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"github.com/gorilla/mux"
"github.com/minio/cli"
xhttp "storj.io/minio/cmd/http"
"storj.io/minio/cmd/logger"
"storj.io/minio/pkg/certs"
"storj.io/minio/pkg/color"
"storj.io/minio/pkg/env"
)
var (
gatewayCmd = cli.Command{
Name: "gateway",
Usage: "start object storage gateway",
Flags: append(ServerFlags, GlobalFlags...),
HideHelpCommand: true,
}
)
// GatewayLocker implements custom NewNSLock implementation
type GatewayLocker struct {
ObjectLayer
nsMutex *nsLockMap
}
// NewNSLock - implements gateway level locker
func (l *GatewayLocker) NewNSLock(bucket string, objects ...string) RWLocker {
return l.nsMutex.NewNSLock(nil, bucket, objects...)
}
// Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix
func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
walk := func(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
go func() {
// Make sure the results channel is ready to be read when we're done.
defer close(results)
var marker string
for {
// set maxKeys to '0' to list maximum possible objects in single call.
loi, err := l.ObjectLayer.ListObjects(ctx, bucket, prefix, marker, "", 0)
if err != nil {
logger.LogIf(ctx, err)
return
}
marker = loi.NextMarker
for _, obj := range loi.Objects {
select {
case results <- obj:
case <-ctx.Done():
return
}
}
if !loi.IsTruncated {
break
}
}
}()
return nil
}
if err := l.ObjectLayer.Walk(ctx, bucket, prefix, results, opts); err != nil {
if _, ok := err.(NotImplemented); ok {
return walk(ctx, bucket, prefix, results)
}
return err
}
return nil
}
// NewGatewayLayerWithLocker - initialize gateway with locker.
func NewGatewayLayerWithLocker(gwLayer ObjectLayer) ObjectLayer {
return &GatewayLocker{ObjectLayer: gwLayer, nsMutex: newNSLock(false)}
}
// RegisterGatewayCommand registers a new command for gateway.
func RegisterGatewayCommand(cmd cli.Command) error {
cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
return nil
}
// ParseGatewayEndpoint - Return endpoint.
func ParseGatewayEndpoint(arg string) (endPoint string, secure bool, err error) {
schemeSpecified := len(strings.Split(arg, "://")) > 1
if !schemeSpecified {
// Default connection will be "secure".
arg = "https://" + arg
}
u, err := url.Parse(arg)
if err != nil {
return "", false, err
}
switch u.Scheme {
case "http":
return u.Host, false, nil
case "https":
return u.Host, true, nil
default:
return "", false, fmt.Errorf("Unrecognized scheme %s", u.Scheme)
}
}
// ValidateGatewayArguments - Validate gateway arguments.
func ValidateGatewayArguments(serverAddr, endpointAddr string) error {
if err := CheckLocalServerAddr(serverAddr); err != nil {
return err
}
if endpointAddr != "" {
// Reject the endpoint if it points to the gateway handler itself.
sameTarget, err := sameLocalAddrs(endpointAddr, serverAddr)
if err != nil {
return err
}
if sameTarget {
return fmt.Errorf("endpoint points to the local gateway")
}
}
return nil
}
// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) {
defer globalDNSCache.Stop()
// This is only to uniquely identify each gateway deployments.
globalDeploymentID = env.Get("MINIO_GATEWAY_DEPLOYMENT_ID", mustGetUUID())
logger.SetDeploymentID(globalDeploymentID)
if gw == nil {
logger.FatalIf(errUnexpected, "Gateway implementation not initialized")
}
// Validate if we have access, secret set through environment.
globalGatewayName = gw.Name()
gatewayName := gw.Name()
if ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, gatewayName, 1)
}
// Initialize globalConsoleSys system
globalConsoleSys = NewConsoleLogger(GlobalContext)
logger.AddTarget(globalConsoleSys)
// Handle common command args.
handleCommonCmdArgs(ctx)
// Check and load TLS certificates.
var err error
globalPublicCerts, globalTLSCerts, GlobalIsTLS, err = getTLSConfig()
logger.FatalIf(err, "Invalid TLS certificate file")
// Check and load Root CAs.
globalRootCAs, err = certs.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
// Add the global public crts as part of global root CAs
for _, publicCrt := range globalPublicCerts {
globalRootCAs.AddCert(publicCrt)
}
// Register root CAs for remote ENVs
env.RegisterGlobalCAs(globalRootCAs)
// Initialize all help
initHelp()
// Get port to listen on from gateway address
globalMinioHost, globalMinioPort = mustSplitHostPort(GlobalCLIContext.Addr)
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
// to IPv6 address ie minio will start listening on IPv6 address whereas another
// (non-)minio process is listening on IPv4 of given port.
// To avoid this error situation we check for port availability.
logger.FatalIf(checkPortAvailability(globalMinioHost, globalMinioPort), "Unable to start the gateway")
globalMinioEndpoint = func() string {
host := globalMinioHost
if host == "" {
host = sortIPs(localIP4.ToSlice())[0]
}
return fmt.Sprintf("%s://%s", getURLScheme(GlobalIsTLS), net.JoinHostPort(host, globalMinioPort))
}()
// Handle gateway specific env
gatewayHandleEnvVars()
// Set system resources to maximum.
setMaxResources()
// Set when gateway is enabled
GlobalIsGateway = true
enableConfigOps := false
// TODO: We need to move this code with globalConfigSys.Init()
// for now keep it here such that "s3" gateway layer initializes
// itself properly when KMS is set.
// Initialize server config.
srvCfg := newServerConfig()
// Override any values from ENVs.
lookupConfigs(srvCfg, nil)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()
globalServerConfig = srvCfg
globalServerConfigMu.Unlock()
// Initialize router. `SkipClean(true)` stops gorilla/mux from
// normalizing URL path minio/minio#3256
// avoid URL path encoding minio/minio#8950
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
// Enable IAM admin APIs if etcd is enabled, if not just enable basic
// operations such as profiling, server info etc.
registerAdminRouter(router, enableConfigOps, false)
// Add healthcheck router
registerHealthCheckRouter(router)
// Add server metrics router
registerMetricsRouter(router)
// Register web router when its enabled.
if globalBrowserEnabled {
logger.FatalIf(registerWebRouter(router), "Unable to configure web browser")
}
// Add API router.
registerAPIRouter(router)
// Use all the middlewares
router.Use(GlobalHandlers...)
var getCert certs.GetCertificateFunc
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
httpServer := xhttp.NewServer([]string{GlobalCLIContext.Addr},
criticalErrorHandler{corsHandler(router)}, getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}
go func() {
globalHTTPServerErrorCh <- httpServer.Start()
}()
globalObjLayerMutex.Lock()
globalHTTPServer = httpServer
globalObjLayerMutex.Unlock()
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
newObject, err := gw.NewGatewayLayer(globalActiveCred)
if err != nil {
globalHTTPServer.Shutdown()
logger.FatalIf(err, "Unable to initialize gateway backend")
}
newObject = NewGatewayLayerWithLocker(newObject)
// Calls all New() for all sub-systems.
newAllSubsystems()
// Once endpoints are finalized, initialize the new object api in safe mode.
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
if gatewayName == NASBackendGateway {
buckets, err := newObject.ListBuckets(GlobalContext)
if err != nil {
logger.Fatal(err, "Unable to list buckets")
}
logger.FatalIf(GlobalNotificationSys.Init(GlobalContext, buckets, newObject), "Unable to initialize notification system")
}
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
}
// Verify if object layer supports
// - encryption
// - compression
verifyObjectLayerFeatures("gateway "+gatewayName, newObject)
// Prints the formatted startup message once object layer is initialized.
if !GlobalCLIContext.Quiet {
mode := globalMinioModeGatewayPrefix + gatewayName
// Check update mode.
checkUpdate(mode)
// Print a warning message if gateway is not ready for production before the startup banner.
if !gw.Production() {
logStartupMessage(color.Yellow(" *** Warning: Not Ready for Production ***"))
}
// Print gateway startup message.
printGatewayStartupMessage(getAPIEndpoints(), gatewayName)
}
handleSignals()
}