-
Notifications
You must be signed in to change notification settings - Fork 72
/
kafka.go
94 lines (84 loc) · 3.07 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package services
import (
"github.com/redhat-developer/app-services-cli/pkg/core/localize"
"github.com/redhat-developer/app-services-cli/pkg/shared/cluster/constants"
"github.com/redhat-developer/app-services-cli/pkg/shared/cluster/kubeclient"
"github.com/redhat-developer/app-services-cli/pkg/shared/cluster/services/resources"
"github.com/redhat-developer/app-services-cli/pkg/shared/cluster/v1alpha"
"github.com/redhat-developer/app-services-cli/pkg/shared/contextutil"
"github.com/redhat-developer/app-services-cli/pkg/shared/kafkautil"
"github.com/redhat-developer/app-services-cli/pkg/shared/servicespec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KafkaService contains methods to connect and bind Kafka Service instance to cluster
type KafkaService struct {
CommandEnvironment *v1alpha.CommandEnvironment
KubernetesClients *kubeclient.KubernetesClients
}
func (s KafkaService) BuildServiceDetails(serviceName string, namespace string, ignoreContext bool) (*ServiceDetails, error) {
cliOpts := s.CommandEnvironment
svcContext, err := cliOpts.ServiceContext.Load()
if err != nil {
return nil, err
}
currCtx, err := contextutil.GetCurrentContext(svcContext, cliOpts.Localizer)
if err != nil {
return nil, err
}
api := cliOpts.Connection.API()
var serviceId string
if serviceName == "" {
if currCtx.KafkaID == "" || ignoreContext {
// nolint
selectedService, err := kafkautil.InteractiveSelect(cliOpts.Context, cliOpts.Connection, cliOpts.Logger, cliOpts.Localizer)
if err != nil {
return nil, err
}
if selectedService == nil {
return nil, nil
}
serviceId = selectedService.GetId()
serviceName = selectedService.GetName()
} else {
serviceId = currCtx.KafkaID
selectedService, _, err := kafkautil.GetKafkaByID(cliOpts.Context, api.KafkaMgmt(), serviceId)
if err != nil {
return nil, err
}
serviceName = selectedService.GetName()
}
} else {
selectedService, _, err := kafkautil.GetKafkaByName(cliOpts.Context, api.KafkaMgmt(), serviceName)
if err != nil {
return nil, err
}
serviceId = selectedService.GetId()
}
kafkaConnectionCR := &resources.KafkaConnection{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
TypeMeta: resources.AKCRMeta,
Spec: resources.KafkaConnectionSpec{
KafkaID: serviceId,
AccessTokenSecretName: constants.TokenSecretName,
Credentials: resources.KafkaCredentialsSpec{
SecretName: constants.ServiceAccountSecretName,
},
},
}
serviceDetails := ServiceDetails{
ID: serviceId,
Name: serviceName,
KubernetesResource: kafkaConnectionCR,
GroupMetadata: resources.AKCResource,
Type: servicespec.KafkaServiceName,
}
return &serviceDetails, nil
}
// PrintAccessCommands prints command to grant service account acccess to the Kafka instance
func (s KafkaService) PrintAccessCommands(clientID string) {
cliOpts := s.CommandEnvironment
cliOpts.Logger.Info(cliOpts.Localizer.MustLocalize("cluster.kubernetes.printKafkaAccessCommands", localize.NewEntry("ClientID", clientID)))
}