-
Notifications
You must be signed in to change notification settings - Fork 72
/
kafka.go
80 lines (72 loc) · 2.38 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package services
import (
"github.com/redhat-developer/app-services-cli/pkg/cluster/constants"
"github.com/redhat-developer/app-services-cli/pkg/cluster/kubeclient"
"github.com/redhat-developer/app-services-cli/pkg/cluster/services/resources"
"github.com/redhat-developer/app-services-cli/pkg/cluster/v1alpha"
"github.com/redhat-developer/app-services-cli/pkg/kafka"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KafkaService contains methods to connect and bind Kafka Service instance to cluster
type KafkaService struct {
CommandEnvironment *v1alpha.CommandEnvironment
KubernetesClients *kubeclient.KubernetesClients
}
func (s KafkaService) BuildServiceDetails(serviceName string, namespace string, ignoreContext bool) (*ServiceDetails, error) {
cliOpts := s.CommandEnvironment
cfg, err := cliOpts.Config.Load()
if err != nil {
return nil, err
}
api := cliOpts.Connection.API()
var serviceId string
if serviceName == "" {
if cfg.Services.Kafka == nil || ignoreContext {
// nolint
selectedService, err := kafka.InteractiveSelect(cliOpts.Context, cliOpts.Connection, cliOpts.Logger, cliOpts.Localizer)
if err != nil {
return nil, err
}
if selectedService == nil {
return nil, nil
}
serviceId = selectedService.GetId()
serviceName = selectedService.GetName()
} else {
serviceId = cfg.Services.Kafka.ClusterID
selectedService, _, err := kafka.GetKafkaByID(cliOpts.Context, api.Kafka(), serviceId)
if err != nil {
return nil, err
}
serviceName = selectedService.GetName()
}
} else {
selectedService, _, err := kafka.GetKafkaByName(cliOpts.Context, api.Kafka(), serviceName)
if err != nil {
return nil, err
}
serviceId = selectedService.GetId()
}
kafkaConnectionCR := &resources.KafkaConnection{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
TypeMeta: resources.AKCRMeta,
Spec: resources.KafkaConnectionSpec{
KafkaID: serviceId,
AccessTokenSecretName: constants.TokenSecretName,
Credentials: resources.KafkaCredentialsSpec{
SecretName: constants.ServiceAccountSecretName,
},
},
}
serviceDetails := ServiceDetails{
ID: serviceId,
Name: serviceName,
KubernetesResource: kafkaConnectionCR,
GroupMetadata: resources.AKCResource,
Type: resources.KafkaServiceName,
}
return &serviceDetails, nil
}