-
Notifications
You must be signed in to change notification settings - Fork 153
/
server.go
121 lines (99 loc) · 3.29 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package server
import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/weaveworks/weave-gitops/core/cache"
"github.com/weaveworks/weave-gitops/core/clustersmngr"
"github.com/weaveworks/weave-gitops/core/nsaccess"
pb "github.com/weaveworks/weave-gitops/pkg/api/core"
"github.com/weaveworks/weave-gitops/pkg/kube"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func Hydrate(ctx context.Context, mux *runtime.ServeMux, cfg CoreServerConfig) error {
appsServer, err := NewCoreServer(cfg)
if err != nil {
return fmt.Errorf("unable to create new kube client: %w", err)
}
if err = pb.RegisterCoreHandlerServer(ctx, mux, appsServer); err != nil {
return fmt.Errorf("could not register new app server: %w", err)
}
return nil
}
const temporarilyEmptyAppName = ""
type ClientGetterFn func(ctx context.Context) clustersmngr.Client
type coreServer struct {
pb.UnimplementedCoreServer
k8s kube.ClientGetter
cacheContainer cache.Container
logger logr.Logger
nsChecker nsaccess.Checker
}
type CoreServerConfig struct {
log logr.Logger
RestCfg *rest.Config
clusterName string
NSAccess nsaccess.Checker
CacheContainer cache.Container
}
func NewCoreConfig(log logr.Logger, cfg *rest.Config, cacheContainer cache.Container, clusterName string) CoreServerConfig {
return CoreServerConfig{
log: log.WithName("core-server"),
RestCfg: cfg,
clusterName: clusterName,
NSAccess: nsaccess.NewChecker(nsaccess.DefautltWegoAppRules),
CacheContainer: cacheContainer,
}
}
func NewCoreServer(cfg CoreServerConfig) (pb.CoreServer, error) {
ctx := context.Background()
cfgGetter := kube.NewImpersonatingConfigGetter(cfg.RestCfg, false)
cfg.CacheContainer.Start(ctx)
return &coreServer{
k8s: kube.NewDefaultClientGetter(cfgGetter, cfg.clusterName),
logger: cfg.log,
cacheContainer: cfg.CacheContainer,
nsChecker: cfg.NSAccess,
}, nil
}
func list(ctx context.Context, k8s client.Client, appName, namespace string, list client.ObjectList, extraOpts ...client.ListOption) error {
opts := []client.ListOption{
getMatchingLabels(appName),
client.InNamespace(namespace),
}
opts = append(opts, extraOpts...)
err := k8s.List(ctx, list, opts...)
err = wrapK8sAPIError("list resource", err)
return err
}
func wrapK8sAPIError(msg string, err error) error {
if k8serrors.IsUnauthorized(err) {
return status.Errorf(codes.PermissionDenied, err.Error())
} else if k8serrors.IsNotFound(err) {
return status.Errorf(codes.NotFound, err.Error())
} else if err != nil {
return fmt.Errorf("%s: %w", msg, err)
}
return nil
}
func doClientError(err error) error {
return status.Errorf(codes.Internal, "unable to make k8s rest client: %s", err.Error())
}
func (cs *coreServer) namespaces() (map[string][]v1.Namespace, error) {
objs, err := cs.cacheContainer.List(cache.NamespaceStorage)
if err != nil {
return nil, err
}
namespaces, ok := objs.(map[string][]v1.Namespace)
if !ok {
return nil, errors.New("could not convert objects to map[string][]v1.Namespace")
}
return namespaces, nil
}