/
server.go
101 lines (92 loc) 路 2.55 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright 2022 The kubegems.io Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"crypto/tls"
"net/http"
"golang.org/x/sync/errgroup"
"kubegems.io/kubegems/pkg/edge/tunnel"
"kubegems.io/kubegems/pkg/log"
"kubegems.io/kubegems/pkg/utils/pprof"
"kubegems.io/kubegems/pkg/utils/system"
"kubegems.io/library/rest/api"
)
func Run(ctx context.Context, options *Options) error {
server, err := NewEdgeServer(ctx, options)
if err != nil {
return err
}
return server.Run(ctx)
}
type EdgeServer struct {
server *tunnel.GrpcTunnelServer
clusters *EdgeManager
tlsConfig *tls.Config
options *Options
}
func NewEdgeServer(ctx context.Context, options *Options) (*EdgeServer, error) {
tlsConfig, err := options.TLS.ToTLSConfig()
if err != nil {
return nil, err
}
edgemanager, err := NewClusterManager(ctx, "", options.Host)
if err != nil {
return nil, err
}
server := &EdgeServer{
server: &tunnel.GrpcTunnelServer{
TunnelServer: tunnel.NewTunnelServer(options.ServerID, nil),
},
tlsConfig: tlsConfig,
options: options,
clusters: edgemanager,
}
return server, nil
}
func (s *EdgeServer) Run(ctx context.Context) error {
ctx = log.NewContext(ctx, log.LogrLogger)
eg, ctx := errgroup.WithContext(ctx)
if s.options.Listen == s.options.ListenGrpc {
eg.Go(func() error {
return system.ListenAndServeContextGRPCAndHTTP(ctx,
s.options.Listen,
s.tlsConfig,
s.HTTPAPI(),
s.server.GrpcServer(s.tlsConfig),
)
})
} else {
eg.Go(func() error {
return s.server.ServeGrpc(ctx, s.options.ListenGrpc, s.tlsConfig)
})
eg.Go(func() error {
return system.ListenAndServeContext(ctx, s.options.Listen, nil, s.HTTPAPI())
})
}
eg.Go(func() error {
return s.clusters.SyncTunnelStatusFrom(ctx, s.server.TunnelServer)
})
eg.Go(func() error {
return pprof.Run(ctx)
})
return eg.Wait()
}
func (s *EdgeServer) HTTPAPI() http.Handler {
edgeapi := &EdgeClusterAPI{
Cluster: s.clusters,
Tunnel: s.server.TunnelServer,
}
return api.NewAPI().Register("/v1", edgeapi).BuildHandler()
}