Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tools/nvim/dap/go.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ dap.configurations.go = {
name = "Debug status-n-billing",
request = "launch",
program = vim.g.root_dir .. "/operators/status-n-billing",
args = { "--dev" },
args = { "--dev", "--serverHost", "localhost:8081" },
-- console = "externalTerminal",
-- externalTerminal = true,
envFile = {
Expand Down
30 changes: 20 additions & 10 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
obj := unstructured.Unstructured{Object: msg.Object}
mLogger := g.logger.WithKV("gvk", obj.GetObjectKind().GroupVersionKind().String()).WithKV("clusterName", msg.ClusterName).WithKV("accountName", msg.AccountName).WithKV("action", msg.Action)

mLogger.Infof("received message [%d]", g.inMemCounter)
defer func() {
mLogger.Infof("processed message [%d]", g.inMemCounter)
}()
mLogger.Infof("[%d] received message", g.inMemCounter)
// defer func() {
// mLogger.Infof("processed message [%d]", g.inMemCounter)
// }()

if len(strings.TrimSpace(msg.AccountName)) == 0 {
return g.handleErrorOnApply(ctx, fmt.Errorf("field 'accountName' must be defined in message"), msg)
}

switch msg.Action {
case "apply", "delete", "create":
case "apply", "delete":
{
b, err := yaml.Marshal(msg.Object)
if err != nil {
Expand All @@ -90,23 +90,31 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
if msg.Action == "apply" {
_, err := g.yamlClient.ApplyYAML(ctx, b)
if err != nil {
mLogger.Infof("[%d] [error-on-apply]: %s", g.inMemCounter, err.Error())
mLogger.Infof("[%d] failed to process message", g.inMemCounter)
return g.handleErrorOnApply(ctx, err, msg)
}
mLogger.Infof("[%d] processed message", g.inMemCounter)
return nil
}

if msg.Action == "delete" {
err := g.yamlClient.DeleteYAML(ctx, b)
if err != nil {
mLogger.Infof("[%d] [error-on-delete]: %s", err.Error())
return g.handleErrorOnApply(ctx, err, msg)
}
mLogger.Infof("[%d] processed message", g.inMemCounter)
return nil
}
return nil
}
default:
{
return g.handleErrorOnApply(ctx, fmt.Errorf("invalid action (%s)", msg.Action), msg)
err := fmt.Errorf("invalid action (%s)", msg.Action)
mLogger.Infof("[%d] [error]: %s", err.Error())
mLogger.Infof("[%d] failed to process message", g.inMemCounter)
return g.handleErrorOnApply(ctx, err, msg)
}
}
}
Expand Down Expand Up @@ -215,9 +223,9 @@ func main() {

for {
cc, err := func() (*grpc.ClientConn, error) {
if isDev {
return libGrpc.Connect(ev.GrpcAddr)
}
// if isDev {
// return libGrpc.Connect(ev.GrpcAddr)
// }
return libGrpc.ConnectSecure(ev.GrpcAddr)
}()

Expand All @@ -228,7 +236,9 @@ func main() {
logger.Infof("GRPC connection successful")

g.msgDispatchCli = messages.NewMessageDispatchServiceClient(cc)
g.run(cc)
if err := g.run(cc); err != nil {
logger.Errorf(err, "running grpc sendActions")
}

connState := cc.GetState()
for connState != connectivity.Ready && connState != connectivity.Shutdown {
Expand Down
9 changes: 5 additions & 4 deletions apis/clusters/v1/byoc_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type BYOCSpec struct {
Region string `json:"region"`
Provider string `json:"provider"`
AccountName string `json:"accountName"`
Region string `json:"region"`
Provider string `json:"provider"`
AccountName string `json:"accountName"`
IncomingKafkaTopic string `json:"incomingKafkaTopic"`

DisplayName string `json:"displayName,omitempty"`
DisplayName string `json:"displayName,omitempty"`
StorageClasses []string `json:"storageClasses,omitempty"`
IngressClasses []string `json:"ingressClasses,omitempty"`
PublicIPs []string `json:"publicIps,omitempty"`
Expand Down
6 changes: 1 addition & 5 deletions apis/crds/v1/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ type Config struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

AccountName string `json:"accountName"`
ClusterName string `json:"clusterName"`

ProjectName string `json:"projectName,omitempty"`
Data map[string]string `json:"data,omitempty"`
Data map[string]string `json:"data,omitempty"`

// +kubebuilder:default=true
Enabled bool `json:"enabled,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion apis/redpanda.msvc/v1/topic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type TopicSpec struct {
RedpandaAdmin string `json:"redpandaAdmin"`
RedpandaAdmin string `json:"redpandaAdmin,omitempty"`

// +kubebuilder:default=3
PartitionCount int `json:"partitionCount,omitempty"`
Expand Down
20 changes: 0 additions & 20 deletions config/crd/bases/_.yaml

This file was deleted.

3 changes: 3 additions & 0 deletions config/crd/bases/clusters.kloudlite.io_byocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ spec:
type: string
displayName:
type: string
incomingKafkaTopic:
type: string
ingressClasses:
items:
type: string
Expand All @@ -72,6 +74,7 @@ spec:
type: array
required:
- accountName
- incomingKafkaTopic
- provider
- region
type: object
Expand Down
9 changes: 0 additions & 9 deletions config/crd/bases/crds.kloudlite.io_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@ spec:
openAPIV3Schema:
description: Config is the Schema for the configs API
properties:
accountName:
type: string
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
clusterName:
type: string
data:
additionalProperties:
type: string
Expand Down Expand Up @@ -69,8 +65,6 @@ spec:
type: object
type: array
type: object
projectName:
type: string
status:
properties:
checks:
Expand Down Expand Up @@ -348,9 +342,6 @@ spec:
type: object
type: array
type: object
required:
- accountName
- clusterName
type: object
served: true
storage: true
Expand Down
2 changes: 0 additions & 2 deletions config/crd/bases/redpanda.msvc.kloudlite.io_topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ spec:
type: integer
redpandaAdmin:
type: string
required:
- redpandaAdmin
type: object
status:
properties:
Expand Down
10 changes: 5 additions & 5 deletions operators/byoc-client-operator/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ type Env struct {
// HelmReleaseName string `env:"HELM_RELEASE_NAME" required:"true"`
HelmReleaseNamespace string `env:"HELM_RELEASE_NAMESPACE" required:"true"`

KafkaTopicBYOCClientUpdates string `env:"KAFKA_TOPIC_BYOC_CLIENT_UPDATES" required:"true"`

KafkaBrokers string `env:"KAFKA_BROKERS" required:"true"`
KafkaSASLUsername string `env:"KAFKA_SASL_USERNAME" required:"true"`
KafkaSASLPassword string `env:"KAFKA_SASL_PASSWORD" required:"true"`
// KafkaTopicBYOCClientUpdates string `env:"KAFKA_TOPIC_BYOC_CLIENT_UPDATES" required:"true"`
//
// KafkaBrokers string `env:"KAFKA_BROKERS" required:"true"`
// KafkaSASLUsername string `env:"KAFKA_SASL_USERNAME" required:"true"`
// KafkaSASLPassword string `env:"KAFKA_SASL_PASSWORD" required:"true"`
}

func LoadEnv() (*Env, error) {
Expand Down
48 changes: 15 additions & 33 deletions operators/byoc-operator/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import (
fn "github.com/kloudlite/operator/pkg/functions"
stepResult "github.com/kloudlite/operator/pkg/operator/step-result"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"

redpandaMsvcv1 "github.com/kloudlite/operator/apis/redpanda.msvc/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
)

type Reconciler struct {
Expand Down Expand Up @@ -85,48 +83,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.

req.Object.Status.IsReady = true
req.Object.Status.LastReconcileTime = &metav1.Time{Time: time.Now()}

req.Object.Status.Resources = req.GetOwnedResources()
if err := r.Status().Update(ctx, req.Object); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: r.Env.ReconcilePeriod}, nil
return ctrl.Result{RequeueAfter: r.Env.ReconcilePeriod}, r.Status().Update(ctx, req.Object)
}

func (r *Reconciler) finalize(req *rApi.Request[*clusterv1.BYOC]) stepResult.Result {
ctx, obj := req.Context(), req.Object
check := rApi.Check{Generation: obj.Generation}

finalizing := "finalizing"

req.LogPreCheck(finalizing)
defer req.LogPostCheck(finalizing)

rr := req.GetOwnedResources()
for i := range rr {
res := &unstructured.Unstructured{Object: map[string]any{"apiVersion": rr[i].APIVersion, "kind": rr[i].Kind}}

if err := r.Get(ctx, fn.NN(rr[i].Namespace, rr[i].Name), res); err != nil {
if !apiErrors.IsNotFound(err) {
if res.GetDeletionTimestamp() != nil {
if err := r.Delete(ctx, res); err != nil {
return req.CheckFailed(finalizing, check, err.Error())
}
}
req.Logger.Infof("waiting for child resource '[%s, %s] %s/%s' to be deleted", rr[i].APIVersion, rr[i].Kind, rr[i].Namespace, rr[i].Name)
return req.CheckFailed(finalizing, check, err.Error())
}
}
if step := req.CleanupOwnedResources(); !step.ShouldProceed() {
return step
}

// --->
controllerutil.RemoveFinalizer(obj, constants.CommonFinalizer)
if err := r.Update(ctx, obj); err != nil {
return req.CheckFailed(finalizing, check, err.Error())
}

return req.Next()
return req.Finalize()
}

func (r *Reconciler) ensureKafkaTopic(req *rApi.Request[*clusterv1.BYOC]) stepResult.Result {
Expand All @@ -136,7 +108,16 @@ func (r *Reconciler) ensureKafkaTopic(req *rApi.Request[*clusterv1.BYOC]) stepRe
req.LogPreCheck(KafkaTopicExists)
defer req.LogPostCheck(KafkaTopicExists)

kt := &redpandaMsvcv1.Topic{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("kl-%s-%s-incoming", obj.Spec.AccountName, obj.Name), Namespace: r.Env.KafkaTopicNamespace}}
var rpAdminList redpandaMsvcv1.AdminList
if err := r.List(ctx, &rpAdminList); err != nil {
return req.CheckFailed(KafkaTopicExists, check, err.Error()).Err(nil)
}

if len(rpAdminList.Items) != 1 {
return req.CheckFailed(KafkaTopicExists, check, "multiple redpanda admin found, should be only one").Err(nil)
}

kt := &redpandaMsvcv1.Topic{ObjectMeta: metav1.ObjectMeta{Name: obj.Spec.IncomingKafkaTopic, Namespace: r.Env.KafkaTopicNamespace}}
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, kt, func() error {
if !fn.IsOwner(kt, fn.AsOwner(obj)) {
kt.SetOwnerReferences([]metav1.OwnerReference{fn.AsOwner(obj, true)})
Expand All @@ -145,6 +126,7 @@ func (r *Reconciler) ensureKafkaTopic(req *rApi.Request[*clusterv1.BYOC]) stepRe
}
kt.Labels["kloudlite.io/byoc.name"] = obj.Name
}
kt.Spec.RedpandaAdmin = rpAdminList.Items[0].Name
return nil
}); err != nil {
return req.CheckFailed(KafkaTopicExists, check, err.Error()).Err(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
stepResult "github.com/kloudlite/operator/pkg/operator/step-result"
"github.com/kloudlite/operator/pkg/redpanda"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -89,7 +90,10 @@ func (r *Reconciler) finalize(req *rApi.Request[*redpandaMsvcv1.Topic]) stepResu

adminCli, err := r.getAdminClient(req)
if err != nil {
return req.CheckFailed(topicDeleted, check, err.Error())
if !apiErrors.IsNotFound(err) {
return req.CheckFailed(topicDeleted, check, err.Error())
}
return req.Finalize()
}

if err := adminCli.DeleteTopic(obj.Name); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *Reconciler) ensureNamespacedRBACs(req *rApi.Request[*v1.Project]) stepR

b, err := templates.Parse(
templates.ProjectRBAC, map[string]any{
"namespace": obj.Name,
"namespace": obj.Spec.TargetNamespace,
"role-name": r.Env.AdminRoleName,
"role-binding-name": r.Env.AdminRoleName + "-rb",
"svc-account-name": r.Env.SvcAccountName,
Expand Down
Loading