From 73fd9244c2cf9080c57a3f6a8d11762f80aca069 Mon Sep 17 00:00:00 2001 From: Anders Johnsen Date: Wed, 20 Sep 2023 17:56:43 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20More=20work=20on=20the=20kuberne?= =?UTF-8?q?tes-native=20cluster=20backend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/rig-server/root.go | 2 +- internal/client/k8s/config_gateway.go | 140 ++++++++++++++++++++------ internal/client/module.go | 3 +- internal/config/types.go | 2 +- internal/service/capsule/rollout.go | 1 + internal/service/cluster/service.go | 2 +- internal/service/operator/operator.go | 16 +-- 7 files changed, 122 insertions(+), 44 deletions(-) diff --git a/cmd/rig-server/root.go b/cmd/rig-server/root.go index 2690738dc..ed28ce69d 100644 --- a/cmd/rig-server/root.go +++ b/cmd/rig-server/root.go @@ -43,7 +43,7 @@ func createRootCMD() *cobra.Command { opts = append(opts, fx.Invoke(func(_ *registry.Server) {})) } - if cfg.Cluster.Type == config.ClusterTypeKubernetes { + if cfg.Cluster.Type == config.ClusterTypeKubernetesNative { opts = append(opts, fx.Invoke(func(_ operator.Service) {})) } diff --git a/internal/client/k8s/config_gateway.go b/internal/client/k8s/config_gateway.go index 7bdcbe827..a417c2669 100644 --- a/internal/client/k8s/config_gateway.go +++ b/internal/client/k8s/config_gateway.go @@ -43,10 +43,6 @@ func newConfigGateway(logger *zap.Logger, restCfg *rest.Config, cs *kubernetes.C } func (g *configGateway) CreateCapsuleConfig(ctx context.Context, cfg *v1alpha1.Capsule) error { - if cfg.Spec.Image == "" { - return nil - } - if err := g.cc.Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: cfg.Namespace}}); err != nil { return checkError(err) } @@ -67,28 +63,12 @@ func (g *configGateway) UpdateCapsuleConfig(ctx context.Context, cfg *v1alpha1.C } func (g *configGateway) ListCapsuleConfigs(ctx context.Context, pagination *model.Pagination) (iterator.Iterator[*v1alpha1.Capsule], int64, error) { - res := &v1alpha1.CapsuleList{} - if err := g.cc.List(ctx, res); err != nil { - return nil, 0, checkError(err) + ls, err := getList(ctx, pagination, g.cc, &v1alpha1.CapsuleList{}) + if err != nil { + return nil, 0, err } - p := iterator.NewProducer[*v1alpha1.Capsule]() - go func() { - defer p.Done() - for _, r := range res.Items { - v := r - if err := p.Value(&v); err != nil { - p.Error(err) - return - } - } - }() - - var c int64 = int64(len(res.Items)) - if res.GetRemainingItemCount() != nil { - c += *res.GetRemainingItemCount() - } - return p, c, nil + return toIterator(ctx, pagination, ls, ls.Items) } func (g *configGateway) GetCapsuleConfig(ctx context.Context, capsuleID string) (*v1alpha1.Capsule, error) { @@ -106,7 +86,23 @@ func (g *configGateway) GetCapsuleConfig(ctx context.Context, capsuleID string) } func (g *configGateway) DeleteCapsuleConfig(ctx context.Context, capsuleID string) error { - return errors.UnimplementedErrorf("unimplemented DeleteCapsuleConfig") + projectID, err := auth.GetProjectID(ctx) + if err != nil { + return err + } + + g.logger.Debug("delete capsule", zap.String("name", capsuleID), zap.String("namespace", projectID.String())) + + if err := g.cc.Delete(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: capsuleID, + Namespace: projectID.String(), + }, + }); err != nil { + return checkError(err) + } + + return nil } func (g *configGateway) SetEnvironmentVariables(ctx context.Context, capsuleID string, envs map[string]string) error { @@ -127,7 +123,17 @@ func (g *configGateway) SetEnvironmentVariables(ctx context.Context, capsuleID s } func (g *configGateway) GetEnvironmentVariables(ctx context.Context, capsuleID string) (map[string]string, error) { - return nil, errors.UnimplementedErrorf("unimplemented GetEnvironmentVariables") + projectID, err := auth.GetProjectID(ctx) + if err != nil { + return nil, err + } + + cf, err := g.GetFile(ctx, capsuleID, capsuleID, projectID.String()) + if err != nil { + return nil, err + } + + return cf.Data, nil } func (g *configGateway) SetEnvironmentVariable(ctx context.Context, capsuleID, name, value string) error { @@ -143,31 +149,61 @@ func (g *configGateway) DeleteEnvironmentVariable(ctx context.Context, capsuleID } func (g *configGateway) GetFile(ctx context.Context, capsuleID, name, namespace string) (*v1.ConfigMap, error) { - return nil, errors.UnimplementedErrorf("unimplemented GetFile") + res := &v1.ConfigMap{} + if err := g.cc.Get(ctx, client.ObjectKey{Name: capsuleID, Namespace: namespace}, res); err != nil { + return nil, checkError(err) + } + + return res, nil } func (g *configGateway) SetFile(ctx context.Context, capsuleID string, file *v1.ConfigMap) error { - return checkError(g.cc.Update(ctx, file)) + return g.upsert(ctx, capsuleID, file) } func (g *configGateway) ListFiles(ctx context.Context, capsuleID string, pagination *model.Pagination) (iterator.Iterator[*v1.ConfigMap], int64, error) { - return nil, 0, errors.UnimplementedErrorf("unimplemented ListFiles") + ls, err := getList(ctx, pagination, g.cc, &v1.ConfigMapList{}) + if err != nil { + return nil, 0, err + } + + return toIterator(ctx, pagination, ls, ls.Items) } func (g *configGateway) DeleteFile(ctx context.Context, capsuleID, name, namespace string) error { - return errors.UnimplementedErrorf("unimplemented DeleteFile") + g.logger.Debug("delete file", zap.String("name", name), zap.String("namespace", namespace)) + if err := g.cc.Delete(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }); err != nil { + return checkError(err) + } + + return nil } func (g *configGateway) GetSecret(ctx context.Context, capsuleID, name, namespace string) (*v1.Secret, error) { - return nil, errors.UnimplementedErrorf("unimplemented GetSecret") + res := &v1.Secret{} + if err := g.cc.Get(ctx, client.ObjectKey{Name: capsuleID, Namespace: namespace}, res); err != nil { + return nil, checkError(err) + } + + return res, nil } func (g *configGateway) SetSecret(ctx context.Context, capsuleID string, secret *v1.Secret) error { - return checkError(g.cc.Update(ctx, secret)) + return g.upsert(ctx, capsuleID, secret) } func (g *configGateway) ListSecrets(ctx context.Context, capsuleID string, pagination *model.Pagination) (iterator.Iterator[*v1.Secret], int64, error) { - return nil, 0, errors.UnimplementedErrorf("unimplemented ListSecrets") + ls, err := getList(ctx, pagination, g.cc, &v1.SecretList{}) + if err != nil { + return nil, 0, err + } + + return toIterator(ctx, pagination, ls, ls.Items) } func (g *configGateway) DeleteSecret(ctx context.Context, capsuleID, name, namespace string) error { @@ -184,6 +220,44 @@ func (g *configGateway) DeleteSecret(ctx context.Context, capsuleID, name, names return nil } +func (g *configGateway) upsert(ctx context.Context, capsuleID string, file client.Object) error { + f := file.DeepCopyObject().(client.Object) + if err := checkError(g.cc.Get(ctx, client.ObjectKeyFromObject(file), f)); errors.IsNotFound(err) { + return checkError(g.cc.Create(ctx, file)) + } else if err != nil { + return err + } + + return checkError(g.cc.Update(ctx, file)) +} + +func getList[L client.ObjectList](ctx context.Context, pagination *model.Pagination, cc client.Client, l L) (L, error) { + projectID, err := auth.GetProjectID(ctx) + if err != nil { + return l, err + } + + if err := cc.List(ctx, l, client.InNamespace(projectID.String())); err != nil { + return l, checkError(err) + } + return l, nil +} + +func toIterator[T any](ctx context.Context, pagination *model.Pagination, ol client.ObjectList, ls []T) (iterator.Iterator[*T], int64, error) { + var ts []*T + for _, i := range ls { + v := i + ts = append(ts, &v) + } + + var c int64 = int64(len(ls)) + if ol.GetRemainingItemCount() != nil { + c += *ol.GetRemainingItemCount() + } + + return iterator.FromList[*T](ts), c, nil +} + func checkError(err error) error { switch apierrors.ReasonForError(err) { case metav1.StatusReasonNotFound: diff --git a/internal/client/module.go b/internal/client/module.go index 37bee4304..a6ad3df13 100644 --- a/internal/client/module.go +++ b/internal/client/module.go @@ -22,7 +22,8 @@ func GetModule(cfg config.Config) fx.Option { switch cfg.Cluster.Type { case config.ClusterTypeDocker: opts = append(opts, fx.Provide(docker.New)) - case config.ClusterTypeKubernetes: + case config.ClusterTypeKubernetes, + config.ClusterTypeKubernetesNative: opts = append(opts, fx.Provide(k8s.New)) } return fx.Module( diff --git a/internal/config/types.go b/internal/config/types.go index ea13c2d34..ac88df5ba 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -70,7 +70,7 @@ type ClientDocker struct { } type ClientKubernetes struct { - WebhooksEnabled bool `mapstructure:"webhooksEnabled"` + WebhooksEnabled bool `mapstructure:"webhooks_enabled"` } type ClientMailjet struct { diff --git a/internal/service/capsule/rollout.go b/internal/service/capsule/rollout.go index c58f71e00..0e1e5b420 100644 --- a/internal/service/capsule/rollout.go +++ b/internal/service/capsule/rollout.go @@ -505,6 +505,7 @@ func (j *rolloutJob) run( }) } + cfg.Spec.Interfaces = nil for _, i := range rc.GetNetwork().GetInterfaces() { capIf := v1alpha1.CapsuleInterface{ Name: i.GetName(), diff --git a/internal/service/cluster/service.go b/internal/service/cluster/service.go index 702f514fe..28b2aca53 100644 --- a/internal/service/cluster/service.go +++ b/internal/service/cluster/service.go @@ -34,7 +34,7 @@ func typeToProto(c config.ClusterType) cluster.ClusterType { switch c { case config.ClusterTypeDocker: return cluster.ClusterType_CLUSTER_TYPE_DOCKER - case config.ClusterTypeKubernetes: + case config.ClusterTypeKubernetes, config.ClusterTypeKubernetesNative: return cluster.ClusterType_CLUSTER_TYPE_KUBERNETES default: return cluster.ClusterType_CLUSTER_TYPE_UNSPECIFIED diff --git a/internal/service/operator/operator.go b/internal/service/operator/operator.go index 4d74cdb54..82e9c6690 100644 --- a/internal/service/operator/operator.go +++ b/internal/service/operator/operator.go @@ -45,7 +45,7 @@ func New(p NewParams) Service { return s } -func (s *service) start() { +func (s *service) start() error { ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel @@ -65,7 +65,7 @@ func (s *service) start() { }) if err != nil { s.log.Error("unable to start manager", zap.Error(err)) - return + return err } //+kubebuilder:scaffold:builder @@ -75,32 +75,34 @@ func (s *service) start() { } if err := cr.SetupWithManager(mgr); err != nil { s.log.Error("unable to setup controller", zap.Error(err)) - return + return err } if s.cfg.Client.Kubernetes.WebhooksEnabled { if err := (&rigdevv1alpha1.Capsule{}).SetupWebhookWithManager(mgr); err != nil { s.log.Error("could not setup webhook with manager", zap.Error(err)) - return + return err } } if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { s.log.Error("unable to set up health check", zap.Error(err)) - return + return err } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { s.log.Error("unable to set up ready check", zap.Error(err)) - return + return err } go func() { s.log.Info("starting operator service") if err := mgr.Start(ctx); err != nil { - s.log.Error("problem running manager", zap.Error(err)) + s.log.Fatal("problem running manager", zap.Error(err)) return } }() + + return nil } func (s *service) stop() {