generated from layer5io/layer5-repo-template
-
Notifications
You must be signed in to change notification settings - Fork 95
/
meshery_broker.go
149 lines (136 loc) · 4.36 KB
/
meshery_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package controllers
import (
"context"
"fmt"
"strings"
opClient "github.com/layer5io/meshery-operator/pkg/client"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
v1 "k8s.io/api/core/v1"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubectl/pkg/polymorphichelpers"
)
var (
brokerMonitoringPortName = "monitor"
)
type mesheryBroker struct {
name string
status MesheryControllerStatus
kclient *mesherykube.Client
}
func NewMesheryBrokerHandler(kubernetesClient *mesherykube.Client) IMesheryController {
return &mesheryBroker{
name: "MesheryBroker",
status: Unknown,
kclient: kubernetesClient,
}
}
func (mb *mesheryBroker) GetName() string {
return mb.name
}
func (mb *mesheryBroker) GetStatus() MesheryControllerStatus {
operatorClient, err := opClient.New(&mb.kclient.RestConfig)
if err != nil || operatorClient == nil {
return Unknown
}
// TODO: Confirm if the presence of operator is needed to use the operator client sdk
_, err = operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{})
if err == nil {
var monitoringEndpoint string
monitoringEndpoint, err = mb.GetEndpointForPort(brokerMonitoringPortName)
if err == nil {
if ConnectivityTest(MesheryServer, monitoringEndpoint) {
mb.status = Connected
return mb.status
}
}
mb.status = Deployed
return mb.status
} else {
if kubeerror.IsNotFound(err) {
if mb.status != Undeployed {
mb.status = Undeployed
}
return mb.status
}
// when operatorClient is not able to get meshesry-broker, we try again with kubernetes client as a fallback
broker, err := mb.kclient.DynamicKubeClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}).Namespace("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if err != nil {
// if the resource is not found, then it is NotDeployed
if kubeerror.IsNotFound(err) {
mb.status = Undeployed
return mb.status
}
return Unknown
}
mb.status = Deploying
sv, er := polymorphichelpers.StatusViewerFor(broker.GroupVersionKind().GroupKind())
if er != nil {
mb.status = Unknown
return mb.status
}
_, done, statusErr := sv.Status(broker, 0)
if statusErr != nil {
mb.status = Unknown
return mb.status
}
if done {
mb.status = Deployed
}
return mb.status
}
}
func (mb *mesheryBroker) Deploy(force bool) error {
// deploying the operator will deploy broker. Right now, we don't need to implement this functionality. But we may implement in the future
return nil
}
func (mb *mesheryBroker) Undeploy() error {
// currently we do not allow the manual undeployment of broker
return nil
}
func (mb *mesheryBroker) GetPublicEndpoint() (string, error) {
operatorClient, err := opClient.New(&mb.kclient.RestConfig)
if err != nil {
return "", ErrGetControllerPublicEndpoint(err)
}
broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if broker.Status.Endpoint.External == "" {
if err == nil {
err = fmt.Errorf("Could not get the External endpoint for meshery-broker")
}
// broker is not available
return "", ErrGetControllerPublicEndpoint(err)
}
return GetBrokerEndpoint(mb.kclient, broker), nil
}
func (mb *mesheryBroker) GetVersion() (string, error) {
statefulSet, err := mb.kclient.KubeClient.AppsV1().StatefulSets("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if kubeerror.IsNotFound(err) {
return "", err
}
return getImageVersionOfContainer(statefulSet.Spec.Template, "nats"), nil
}
func (mb *mesheryBroker) GetEndpointForPort(portName string) (string, error) {
endpoint, err := mesherykube.GetServiceEndpoint(context.TODO(), mb.kclient.KubeClient, &mesherykube.ServiceOptions{
Name: "meshery-broker",
Namespace: "meshery",
PortSelector: portName,
})
if err != nil {
return "", err
}
return endpoint.External.String(), nil
}
func getImageVersionOfContainer(container v1.PodTemplateSpec, containerName string) string {
var version string
for _, container := range container.Spec.Containers {
if strings.Compare(container.Name, containerName) == 0 {
versionTag := strings.Split(container.Image, ":")
if len(versionTag) > 1 {
version = versionTag[1]
}
}
}
return version
}