diff --git a/deploy/charts/rig-operator/values.yaml b/deploy/charts/rig-operator/values.yaml index 6d8ad1ec9..fd2417ca2 100644 --- a/deploy/charts/rig-operator/values.yaml +++ b/deploy/charts/rig-operator/values.yaml @@ -5,10 +5,11 @@ config: webhooksEnabled: true devModeEnabled: false leaderElectionEnabled: true - prometheusServiceMonitor: - path: "" - portName: "" pipeline: + serviceAccountStep: + plugin: "rigdev.service_account" + deploymentStep: + plugin: "rigdev.deployment" routesStep: plugin: "rigdev.ingress_routes" config: | @@ -16,6 +17,13 @@ config: createCertificateResources: false ingressClassName: "" disableTLS: false + cronJobsStep: + plugin: "rigdev.cron_jobs" + # serviceMonitorStep: + # plugin: "rigdev.service_monitor" + # config: | + # portName: "system" + # path: "metrics" steps: [] # - plugins: diff --git a/docs/docs/api/config/v1alpha1.md b/docs/docs/api/config/v1alpha1.md index 0250c2daf..b6031f1b2 100644 --- a/docs/docs/api/config/v1alpha1.md +++ b/docs/docs/api/config/v1alpha1.md @@ -48,6 +48,21 @@ _Appears in:_ | `enableForPlatform` _boolean_ | If set, will enable the step for the Rig platform which is a Capsule as well | +### CapsuleStep + + + + + +_Appears in:_ +- [Pipeline](#pipeline) + +| Field | Description | +| --- | --- | +| `plugin` _string_ | The plugin to use for handling the capsule step. fx. "rigdev.ingress_routes" for routesStep will create an ingress resource per route. fx. "rigdev.deployment" for deploymentStep will use the default deployment plugin. | +| `config` _string_ | Config is a string defining the plugin-specific configuration of the plugin. | + + ### Client @@ -392,8 +407,6 @@ OperatorConfig is the Schema for the operator config API | `webhooksEnabled` _boolean_ | WebhooksEnabled sets wether or not webhooks should be enabled. When enabled a certificate should be mounted at the webhook server certificate path. Defaults to true if omitted. | | `devModeEnabled` _boolean_ | DevModeEnabled enables verbose logs and changes the logging format to be more human readable. | | `leaderElectionEnabled` _boolean_ | LeaderElectionEnabled enables leader election when running multiple instances of the operator. | -| `prometheusServiceMonitor` _[PrometheusServiceMonitor](#prometheusservicemonitor)_ | PrometheusServiceMonitor defines if Rig should spawn a Prometheus ServiceMonitor per capsule for use with a Prometheus Operator stack. | -| `verticalPodAutoscaler` _[VerticalPodAutoscaler](#verticalpodautoscaler)_ | VerticalPodAutoscaler holds the configuration for the VerticalPodAutoscaler resources potentially generated by the operator. | | `pipeline` _[Pipeline](#pipeline)_ | Pipeline defines the capsule controller pipeline | @@ -423,7 +436,12 @@ _Appears in:_ | Field | Description | | --- | --- | -| `routesStep` _[RoutesStep](#routesstep)_ | How to handle the routes for capsules in the cluster. | +| `serviceAccountStep` _[CapsuleStep](#capsulestep)_ | How to handle the service account step of capsules in the cluster. Defaults to rigdev.service_account. | +| `deploymentStep` _[CapsuleStep](#capsulestep)_ | How to handle the deployment step of capsules in the cluster. Defaults to rigdev.deployment. | +| `routesStep` _[CapsuleStep](#capsulestep)_ | How to handle the routes for capsules in the cluster. If left empty, routes will not be handled. | +| `cronJobsStep` _[CapsuleStep](#capsulestep)_ | How to handle the cronjob step of capsules in the cluster. Defaults to rigdev.cron_jobs | +| `vpaStep` _[CapsuleStep](#capsulestep)_ | How to handle the VPA step of capsules in the cluster. If left empty, no VPAs will be created. | +| `serviceMonitorStep` _[CapsuleStep](#capsulestep)_ | How to handle the service monitor step of capsules in the cluster. If left empty, no service monitors will be created. rigdev.service_monitor plugin spawns a Prometheus ServiceMonitor per capsule for use with a Prometheus Operator stack. | | `steps` _[Step](#step) array_ | Steps to perform as part of running the operator. | | `customPlugins` _[CustomPlugin](#customplugin) array_ | CustomPlugins enables custom plugins to be injected into the operator. The plugins injected here can then be referenced in 'steps' | @@ -469,19 +487,6 @@ _Appears in:_ | `config` _string_ | Config is a string defining the plugin-specific configuration of the plugin. | -### PrometheusServiceMonitor - - - - - -_Appears in:_ -- [OperatorConfig](#operatorconfig) - -| Field | Description | -| --- | --- | -| `path` _string_ | Path is the path which Prometheus should query on ports. Defaults to /metrics if not set. | -| `portName` _string_ | PortName is the name of the port which Prometheus will query metrics on | ### Repository @@ -499,21 +504,6 @@ _Appears in:_ | `secret` _string_ | Secret is a secret key used for encrypting sensitive data before saving it in the database. | -### RoutesStep - - - - - -_Appears in:_ -- [Pipeline](#pipeline) - -| Field | Description | -| --- | --- | -| `plugin` _string_ | The plugin to use for handling routes in capsule interfaces. If not set, routes will not be handled. fx. "rigdev.ingress_routes" will create an ingress resource per route. | -| `config` _string_ | Config is a string defining the plugin-specific configuration of the plugin. | - - ### SSHCredential @@ -562,18 +552,6 @@ _Appears in:_ | `enableForPlatform` _boolean_ | If set, will enable the step for the Rig platform which is a Capsule as well Deprecated, use Match.EnableForPlatform. | -### VerticalPodAutoscaler - - - - - -_Appears in:_ -- [OperatorConfig](#operatorconfig) - -| Field | Description | -| --- | --- | -| `enabled` _boolean_ | Enabled enables the creation of a VerticalPodAutoscaler per capsule | diff --git a/docs/docs/operator-manual/plugins/builtin/cron_jobs.mdx b/docs/docs/operator-manual/plugins/builtin/cron_jobs.mdx new file mode 100644 index 000000000..ebeee2255 --- /dev/null +++ b/docs/docs/operator-manual/plugins/builtin/cron_jobs.mdx @@ -0,0 +1,13 @@ +# Cron Jobs Plugin + +The `rigdev.cron_jobs` plugin is the default plugin for handling the jobs specified in the `capsule spec` in the reconcilliation pipeline. For each job specified in the capsule spec, if the job is specified by a command, the plugin will create a cron job based on the container of the capsule deployment. Alternatively, if the job is specified by a URL, the plugin will create a cron job that will curl the URL. + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/docs/docs/operator-manual/plugins/builtin/deployment.mdx b/docs/docs/operator-manual/plugins/builtin/deployment.mdx new file mode 100644 index 000000000..9a2689350 --- /dev/null +++ b/docs/docs/operator-manual/plugins/builtin/deployment.mdx @@ -0,0 +1,17 @@ +# Deployment Plugin + +Default plugin for handling deployments in the reconcilliation pipeline. Another plugin can be specified in the `deploymentStep` in the pipeline in the operator config. +The `rigdev.deployment` plugin will create a deployment for the capsule, and a service if the the capsule has interfaces defined. + + + + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/docs/docs/operator-manual/plugins/builtin/ingress_routes.mdx b/docs/docs/operator-manual/plugins/builtin/ingress_routes.mdx index 93388f174..5d040a375 100644 --- a/docs/docs/operator-manual/plugins/builtin/ingress_routes.mdx +++ b/docs/docs/operator-manual/plugins/builtin/ingress_routes.mdx @@ -10,14 +10,13 @@ Config (in context of the rig-operator Helm values): ``` config: pipeline: - steps: - - plugins: - - name: rigdev.ingress_routes - config: | - clusterIssuer: letsencrypt-prod - createCertificateResources: true - ingressClassName: nginx - disableTLS: false + routesStep: + plugin: "rigdev.ingress_routes" + config: | + clusterIssuer: letsencrypt-prod + createCertificateResources: true + ingressClassName: nginx + disableTLS: false ``` ## Config diff --git a/docs/docs/operator-manual/plugins/builtin/service_account.mdx b/docs/docs/operator-manual/plugins/builtin/service_account.mdx new file mode 100644 index 000000000..154a48f03 --- /dev/null +++ b/docs/docs/operator-manual/plugins/builtin/service_account.mdx @@ -0,0 +1,12 @@ +# Service Account Plugin +The `rigdev.service_account` plugin provides the default way of handling service accounts in the reconcilliation pipeline. It will create a service account with the name and namespace of the capsule. + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/docs/docs/operator-manual/plugins/builtin/service_monitor.mdx b/docs/docs/operator-manual/plugins/builtin/service_monitor.mdx new file mode 100644 index 000000000..fc516e70c --- /dev/null +++ b/docs/docs/operator-manual/plugins/builtin/service_monitor.mdx @@ -0,0 +1,30 @@ +# Service Monitor Plugin +The `rigdev.service_monitor` plugin spawns a Prometheus ServiceMonitor per capsule +for use with a Prometheus Operator stack. The service monitor will monitor services with the same name as the capsule and will use the endpoint specified by the `path` and `portName` fields in the configuration. + +## Example + +Config (in the context of the rig-operator Helm values): +``` +config: + pipeline: + serviceMonitorStep: + plugin: "rigdev.service_monitor" + config: | + path: metrics + portName: metricsport +``` + +## Config + + + +Configuration for the deployment plugin + +| Field | Description | +| --- | --- | +| `Path` _string_ | | +| `PortName` _string_ | | + + + diff --git a/docs/docs/operator-manual/plugins/builtin/vpa.mdx b/docs/docs/operator-manual/plugins/builtin/vpa.mdx new file mode 100644 index 000000000..6983dfbbc --- /dev/null +++ b/docs/docs/operator-manual/plugins/builtin/vpa.mdx @@ -0,0 +1,12 @@ +## Vertical Pod Autoscaler Plugin + + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/pkg/api/config/v1alpha1/operator_config_types.go b/pkg/api/config/v1alpha1/operator_config_types.go index a4caf7672..41ed849de 100644 --- a/pkg/api/config/v1alpha1/operator_config_types.go +++ b/pkg/api/config/v1alpha1/operator_config_types.go @@ -28,21 +28,31 @@ type OperatorConfig struct { // instances of the operator. LeaderElectionEnabled *bool `json:"leaderElectionEnabled,omitempty"` - // PrometheusServiceMonitor defines if Rig should spawn a Prometheus ServiceMonitor per capsule - // for use with a Prometheus Operator stack. - PrometheusServiceMonitor *PrometheusServiceMonitor `json:"prometheusServiceMonitor,omitempty"` - - // VerticalPodAutoscaler holds the configuration for the VerticalPodAutoscaler resources - // potentially generated by the operator. - VerticalPodAutoscaler VerticalPodAutoscaler `json:"verticalPodAutoscaler,omitempty"` - // Pipeline defines the capsule controller pipeline Pipeline Pipeline `json:"pipeline,omitempty"` } type Pipeline struct { + // How to handle the service account step of capsules in the cluster. + // Defaults to rigdev.service_account. + ServiceAccountStep CapsuleStep `json:"serviceAccountStep,omitempty"` + // How to handle the deployment step of capsules in the cluster. + // Defaults to rigdev.deployment. + DeploymentStep CapsuleStep `json:"deploymentStep,omitempty"` // How to handle the routes for capsules in the cluster. - RoutesStep RoutesStep `json:"routesStep,omitempty"` + // If left empty, routes will not be handled. + RoutesStep CapsuleStep `json:"routesStep,omitempty"` + // How to handle the cronjob step of capsules in the cluster. + // Defaults to rigdev.cron_jobs + CronJobsStep CapsuleStep `json:"cronJobsStep,omitempty"` + // How to handle the VPA step of capsules in the cluster. + // If left empty, no VPAs will be created. + VPAStep CapsuleStep `json:"vpaStep,omitempty"` + // How to handle the service monitor step of capsules in the cluster. + // If left empty, no service monitors will be created. + // rigdev.service_monitor plugin spawns a Prometheus ServiceMonitor per capsule + // for use with a Prometheus Operator stack. + ServiceMonitorStep CapsuleStep `json:"serviceMonitorStep,omitempty"` // Steps to perform as part of running the operator. // +patchStrategy=merge Steps []Step `json:"steps,omitempty"` @@ -51,10 +61,10 @@ type Pipeline struct { CustomPlugins []CustomPlugin `json:"customPlugins,omitempty"` } -type RoutesStep struct { - // The plugin to use for handling routes in capsule interfaces. If not set, - // routes will not be handled. - // fx. "rigdev.ingress_routes" will create an ingress resource per route. +type CapsuleStep struct { + // The plugin to use for handling the capsule step. + // fx. "rigdev.ingress_routes" for routesStep will create an ingress resource per route. + // fx. "rigdev.deployment" for deploymentStep will use the default deployment plugin. Plugin string `json:"plugin,omitempty"` // Config is a string defining the plugin-specific configuration of the plugin. diff --git a/pkg/api/config/v1alpha1/zz_generated.deepcopy.go b/pkg/api/config/v1alpha1/zz_generated.deepcopy.go index da618157b..e4a095fe3 100644 --- a/pkg/api/config/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/config/v1alpha1/zz_generated.deepcopy.go @@ -56,6 +56,21 @@ func (in *CapsuleMatch) DeepCopy() *CapsuleMatch { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapsuleStep) DeepCopyInto(out *CapsuleStep) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapsuleStep. +func (in *CapsuleStep) DeepCopy() *CapsuleStep { + if in == nil { + return nil + } + out := new(CapsuleStep) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Client) DeepCopyInto(out *Client) { *out = *in @@ -385,12 +400,6 @@ func (in *OperatorConfig) DeepCopyInto(out *OperatorConfig) { *out = new(bool) **out = **in } - if in.PrometheusServiceMonitor != nil { - in, out := &in.PrometheusServiceMonitor, &out.PrometheusServiceMonitor - *out = new(PrometheusServiceMonitor) - **out = **in - } - out.VerticalPodAutoscaler = in.VerticalPodAutoscaler in.Pipeline.DeepCopyInto(&out.Pipeline) } @@ -430,7 +439,12 @@ func (in *PathPrefixes) DeepCopy() *PathPrefixes { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Pipeline) DeepCopyInto(out *Pipeline) { *out = *in + out.ServiceAccountStep = in.ServiceAccountStep + out.DeploymentStep = in.DeploymentStep out.RoutesStep = in.RoutesStep + out.CronJobsStep = in.CronJobsStep + out.VPAStep = in.VPAStep + out.ServiceMonitorStep = in.ServiceMonitorStep if in.Steps != nil { in, out := &in.Steps, &out.Steps *out = make([]Step, len(*in)) @@ -544,21 +558,6 @@ func (in *Repository) DeepCopy() *Repository { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *RoutesStep) DeepCopyInto(out *RoutesStep) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RoutesStep. -func (in *RoutesStep) DeepCopy() *RoutesStep { - if in == nil { - return nil - } - out := new(RoutesStep) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SSHCredential) DeepCopyInto(out *SSHCredential) { *out = *in diff --git a/pkg/controller/capsule_controller.go b/pkg/controller/capsule_controller.go index a9dc83e30..003dc8d32 100644 --- a/pkg/controller/capsule_controller.go +++ b/pkg/controller/capsule_controller.go @@ -60,18 +60,6 @@ type CapsuleReconciler struct { } const ( - AnnotationChecksumFiles = "rig.dev/config-checksum-files" - AnnotationChecksumAutoEnv = "rig.dev/config-checksum-auto-env" - AnnotationChecksumEnv = "rig.dev/config-checksum-env" - AnnotationChecksumSharedEnv = "rig.dev/config-checksum-shared-env" - - AnnotationOverrideOwnership = "rig.dev/override-ownership" - AnnotationPullSecret = "rig.dev/pull-secret" - - LabelSharedConfig = "rig.dev/shared-config" - LabelCapsule = "rig.dev/capsule" - LabelCron = "batch.kubernets.io/cronjob" - fieldFilesConfigMapName = ".spec.files.configMap.name" fieldFilesSecretName = ".spec.files.secret.name" fieldEnvConfigMapName = ".spec.env.from.configMapName" @@ -166,7 +154,7 @@ func (r *CapsuleReconciler) SetupWithManager(mgr ctrl.Manager) error { b = b.Owns(&monitorv1.ServiceMonitor{}) } - if r.Config.VerticalPodAutoscaler.Enabled { + if r.Config.Pipeline.VPAStep.Plugin != "" { b = b.Owns(&vpav1.VerticalPodAutoscaler{}) } @@ -202,7 +190,7 @@ func findCapsulesForConfig(mgr ctrl.Manager) handler.MapFunc { return func(ctx context.Context, o client.Object) []ctrl.Request { var capsulesWithReference v1alpha2.CapsuleList // Queue reconcile for all capsules in namespace if this is a shared config - if sharedConfig := o.GetLabels()[LabelSharedConfig]; sharedConfig == "true" { + if sharedConfig := o.GetLabels()[pipeline.LabelSharedConfig]; sharedConfig == "true" { if err := c.List(ctx, &capsulesWithReference, client.InNamespace(o.GetNamespace())); err != nil { log.Error(err, "could not get capsules") } @@ -348,7 +336,7 @@ func (r *CapsuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } var options []pipeline.CapsuleRequestOption - if v, _ := strconv.ParseBool(capsule.Annotations[AnnotationOverrideOwnership]); v { + if v, _ := strconv.ParseBool(capsule.Annotations[pipeline.AnnotationOverrideOwnership]); v { options = append(options, pipeline.WithForce()) } diff --git a/pkg/controller/plugin/external_plugin.go b/pkg/controller/plugin/external_plugin.go index f1d20a7d1..10303751f 100644 --- a/pkg/controller/plugin/external_plugin.go +++ b/pkg/controller/plugin/external_plugin.go @@ -2,6 +2,8 @@ package plugin import ( "context" + "encoding/json" + "fmt" "io" "os" "os/exec" @@ -18,6 +20,7 @@ import ( "google.golang.org/grpc" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,6 +38,7 @@ func newPluginExecutor( name, stepTag, pluginTag, pluginConfig, path string, args []string, logger logr.Logger, + restConfig *rest.Config, ) (*pluginExecutor, error) { tag := stepTag if pluginTag != "" { @@ -48,7 +52,7 @@ func newPluginExecutor( tag: tag, } - return p, p.start(context.Background(), pluginConfig) + return p, p.start(context.Background(), pluginConfig, restConfig) } type loggerSink struct { @@ -63,7 +67,7 @@ func (l *loggerSink) Accept(name string, level hclog.Level, msg string, args ... logger.Info(msg) } -func (p *pluginExecutor) start(ctx context.Context, pluginConfig string) error { +func (p *pluginExecutor) start(ctx context.Context, pluginConfig string, restConfig *rest.Config) error { pLogger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: p.name, Output: io.Discard, @@ -101,7 +105,7 @@ func (p *pluginExecutor) start(ctx context.Context, pluginConfig string) error { p.pluginClient = raw.(*pluginClient) - return p.pluginClient.Initialize(ctx, pluginConfig, p.tag) + return p.pluginClient.Initialize(ctx, pluginConfig, p.tag, restConfig) } func (p *pluginExecutor) Stop(context.Context) { @@ -148,10 +152,22 @@ type pluginClient struct { client apiplugin.PluginServiceClient } -func (m *pluginClient) Initialize(ctx context.Context, pluginConfig, tag string) error { - _, err := m.client.Initialize(ctx, &apiplugin.InitializeRequest{ +func (m *pluginClient) Initialize(ctx context.Context, pluginConfig, tag string, restConfig *rest.Config) error { + tlsConfigBytes, err := json.Marshal(restConfig.TLSClientConfig) + if err != nil { + return err + } + + restCfg := &apiplugin.RestConfig{ + Host: restConfig.Host, + BearerToken: restConfig.BearerToken, + TlsConfig: tlsConfigBytes, + } + + _, err = m.client.Initialize(ctx, &apiplugin.InitializeRequest{ PluginConfig: pluginConfig, Tag: tag, + RestConfig: restCfg, }) return err } @@ -212,6 +228,7 @@ func (s requestServer) GetObject( co.SetName(req.GetName()) if req.GetCurrent() { if err := s.req.GetExisting(co); err != nil { + fmt.Println("error getting existing object", err) return nil, err } } else { diff --git a/pkg/controller/plugin/manager.go b/pkg/controller/plugin/manager.go index 711c78f99..25d88b74a 100644 --- a/pkg/controller/plugin/manager.go +++ b/pkg/controller/plugin/manager.go @@ -14,9 +14,11 @@ import ( "github.com/rigdev/rig/pkg/api/config/v1alpha1" "golang.org/x/exp/maps" "gopkg.in/yaml.v3" + "k8s.io/client-go/rest" ) type Manager struct { + restConfig *rest.Config plugins map[string]Info builtinBinaryPath string } @@ -74,6 +76,13 @@ func validatePluginName(s string) error { // TODO Find a way to import the names of all plugins here using the map from github.com/rigdev/rig/plugins/allplugins // This creates a dependency cycle var allPlugins = []string{ + "rigdev.service_account", + "rigdev.deployment", + "rigdev.ingress_routes", + "rigdev.cron_jobs", + "rigdev.vpa", + "rigdev.service_monitor", + "rigdev.annotations", "rigdev.datadog", "rigdev.env_mapping", @@ -82,12 +91,12 @@ var allPlugins = []string{ "rigdev.object_template", "rigdev.placement", "rigdev.sidecar", - "rigdev.ingress_routes", } -func NewManager(opts ...ManagerOption) (*Manager, error) { +func NewManager(restCfg *rest.Config, opts ...ManagerOption) (*Manager, error) { manager := &Manager{ - plugins: map[string]Info{}, + restConfig: restCfg, + plugins: map[string]Info{}, } for _, o := range opts { @@ -244,6 +253,7 @@ func (m *Manager) NewStep(step v1alpha1.Step, logger logr.Logger) (*Step, error) plugin.Name, step.Tag, plugin.Tag, plugin.Config, info.BinaryPath, info.Args, logger, + m.restConfig, ) if err != nil { return nil, err diff --git a/pkg/controller/plugin/server.go b/pkg/controller/plugin/server.go index 8ea8dd709..f12d5fac1 100644 --- a/pkg/controller/plugin/server.go +++ b/pkg/controller/plugin/server.go @@ -2,6 +2,7 @@ package plugin import ( "context" + "encoding/json" "fmt" "os" @@ -13,6 +14,7 @@ import ( "github.com/rigdev/rig/pkg/pipeline" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -28,9 +30,35 @@ func (m *GRPCServer) Initialize( _ context.Context, req *apiplugin.InitializeRequest, ) (*apiplugin.InitializeResponse, error) { + var restConfig *rest.Config + var err error + if req.RestConfig == nil { + restConfig, err = rest.InClusterConfig() + if err != nil { + return nil, err + } + } else { + TLSClientConfig := rest.TLSClientConfig{} + if err = json.Unmarshal(req.RestConfig.TlsConfig, &TLSClientConfig); err != nil { + return nil, err + } + + restConfig = &rest.Config{ + Host: req.RestConfig.Host, + TLSClientConfig: TLSClientConfig, + BearerToken: req.RestConfig.BearerToken, + } + } + + client, err := client.New(restConfig, client.Options{Scheme: m.scheme}) + if err != nil { + return nil, err + } + if err := m.Impl.Initialize(InitializeRequest{ Config: []byte(req.GetPluginConfig()), Tag: req.GetTag(), + Reader: client, }); err != nil { return nil, err } @@ -252,6 +280,7 @@ type Plugin interface { type InitializeRequest struct { Config []byte Tag string + Reader client.Reader } // StartPlugin starts the plugin so it can listen for requests to be run on a CapsuleRequest diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index aa5edc772..1bff7a791 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -6,10 +6,37 @@ import ( "net/http" "connectrpc.com/connect" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type Error struct { + e connect.Error +} + +func (e Error) Unwrap() error { + return &e.e +} + +func (e Error) Error() string { + errString := e.e.Error() + return errString +} + +func (e Error) GRPCStatus() *status.Status { + return status.New(codes.Code(e.e.Code()), e.e.Message()) +} + func CodeOf(err error) connect.Code { - return connect.CodeOf(err) + if code := connect.CodeOf(err); code != connect.CodeUnknown { + return code + } + + if code := status.Code(err); code != codes.Unknown { + return connect.Code(code) + } + + return connect.CodeUnknown } func MessageOf(err error) string { @@ -17,9 +44,15 @@ func MessageOf(err error) string { return "" } - if e, ok := err.(*connect.Error); ok { - return e.Message() + var ce *connect.Error + if errors.As(err, &ce) { + return ce.Message() } + + if s, ok := status.FromError(err); ok { + return s.Message() + } + return err.Error() } @@ -28,7 +61,7 @@ func MessageOf(err error) string { // The gRPC framework will generate this error code when cancellation // is requested. func IsCanceled(err error) bool { - return connect.CodeOf(err) == connect.CodeCanceled + return CodeOf(err) == connect.CodeCanceled } // IsUnknown error. An example of where this error may be returned is @@ -40,7 +73,7 @@ func IsCanceled(err error) bool { // The gRPC framework will generate this error code in the above two // mentioned cases. func IsUnknown(err error) bool { - return connect.CodeOf(err) == connect.CodeUnknown + return CodeOf(err) == connect.CodeUnknown } // IsInvalidArgument indicates client specified an invalid argument. @@ -50,7 +83,7 @@ func IsUnknown(err error) bool { // // This error code will not be generated by the gRPC framework. func IsInvalidArgument(err error) bool { - return connect.CodeOf(err) == connect.CodeInvalidArgument + return CodeOf(err) == connect.CodeInvalidArgument } // IsDeadlineExceeded means operation expired before completion. @@ -62,7 +95,7 @@ func IsInvalidArgument(err error) bool { // The gRPC framework will generate this error code when the deadline is // exceeded. func IsDeadlineExceeded(err error) bool { - return connect.CodeOf(err) == connect.CodeDeadlineExceeded + return CodeOf(err) == connect.CodeDeadlineExceeded } // IsNotFound means some requested entity (e.g., file or directory) was @@ -70,7 +103,7 @@ func IsDeadlineExceeded(err error) bool { // // This error code will not be generated by the gRPC framework. func IsNotFound(err error) bool { - return connect.CodeOf(err) == connect.CodeNotFound + return CodeOf(err) == connect.CodeNotFound } // IsAlreadyExists means an attempt to create an entity failed because one @@ -78,7 +111,7 @@ func IsNotFound(err error) bool { // // This error code will not be generated by the gRPC framework. func IsAlreadyExists(err error) bool { - return connect.CodeOf(err) == connect.CodeAlreadyExists + return CodeOf(err) == connect.CodeAlreadyExists } // IsPermissionDenied indicates the caller does not have permission to @@ -91,7 +124,7 @@ func IsAlreadyExists(err error) bool { // This error code will not be generated by the gRPC core framework, // but expect authentication middleware to use it. func IsPermissionDenied(err error) bool { - return connect.CodeOf(err) == connect.CodePermissionDenied + return CodeOf(err) == connect.CodePermissionDenied } // IsResourceExhausted indicates some resource has been exhausted, perhaps @@ -101,7 +134,7 @@ func IsPermissionDenied(err error) bool { // out-of-memory and server overload situations, or when a message is // larger than the configured maximum size. func IsResourceExhausted(err error) bool { - return connect.CodeOf(err) == connect.CodeResourceExhausted + return CodeOf(err) == connect.CodeResourceExhausted } // IsFailedPrecondition indicates operation was rejected because the @@ -127,7 +160,7 @@ func IsResourceExhausted(err error) bool { // // This error code will not be generated by the gRPC framework. func IsFailedPrecondition(err error) bool { - return connect.CodeOf(err) == connect.CodeFailedPrecondition + return CodeOf(err) == connect.CodeFailedPrecondition } // IsAborted indicates the operation was aborted, typically due to a @@ -139,7 +172,7 @@ func IsFailedPrecondition(err error) bool { // // This error code will not be generated by the gRPC framework. func IsAborted(err error) bool { - return connect.CodeOf(err) == connect.CodeAborted + return CodeOf(err) == connect.CodeAborted } // IsOutOfRange means operation was attempted past the valid range. @@ -160,7 +193,7 @@ func IsAborted(err error) bool { // // This error code will not be generated by the gRPC framework. func IsOutOfRange(err error) bool { - return connect.CodeOf(err) == connect.CodeOutOfRange + return CodeOf(err) == connect.CodeOutOfRange } // IsUnimplemented indicates operation is not implemented or not @@ -172,7 +205,7 @@ func IsOutOfRange(err error) bool { // compression algorithms or a disagreement as to whether an RPC should // be streaming. func IsUnimplemented(err error) bool { - return connect.CodeOf(err) == connect.CodeUnimplemented + return CodeOf(err) == connect.CodeUnimplemented } // IsInternal errors. Means some invariants expected by underlying @@ -182,7 +215,7 @@ func IsUnimplemented(err error) bool { // This error code will be generated by the gRPC framework in several // internal error conditions. func IsInternal(err error) bool { - return connect.CodeOf(err) == connect.CodeInternal + return CodeOf(err) == connect.CodeInternal } // IsUnavailable indicates the service is currently unavailable. @@ -196,14 +229,14 @@ func IsInternal(err error) bool { // This error code will be generated by the gRPC framework during // abrupt shutdown of a server process or network connection. func IsUnavailable(err error) bool { - return connect.CodeOf(err) == connect.CodeUnavailable + return CodeOf(err) == connect.CodeUnavailable } // IsDataLoss indicates unrecoverable data loss or corruption. // // This error code will not be generated by the gRPC framework. func IsDataLoss(err error) bool { - return connect.CodeOf(err) == connect.CodeDataLoss + return CodeOf(err) == connect.CodeDataLoss } // IsUnauthenticated indicates the request does not have valid @@ -213,7 +246,7 @@ func IsDataLoss(err error) bool { // authentication metadata is invalid or a Credentials callback fails, // but also expect authentication middleware to generate it. func IsUnauthenticated(err error) bool { - return connect.CodeOf(err) == connect.CodeUnauthenticated + return CodeOf(err) == connect.CodeUnauthenticated } // CanceledErrorf creates a new error that indicates the operation was canceled (typically by the caller). @@ -221,7 +254,7 @@ func IsUnauthenticated(err error) bool { // The gRPC framework will generate this error code when cancellation // is requested. func CanceledErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeCanceled, fmt.Errorf(format, a...)) + return NewError(connect.CodeCanceled, fmt.Errorf(format, a...)) } // UnknownErrorf creates a new error that error. An example of where this error may be returned is @@ -233,7 +266,7 @@ func CanceledErrorf(format string, a ...interface{}) error { // The gRPC framework will generate this error code in the above two // mentioned cases. func UnknownErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeUnknown, fmt.Errorf(format, a...)) + return NewError(connect.CodeUnknown, fmt.Errorf(format, a...)) } // InvalidArgumentErrorf creates a new error that indicates client specified an invalid argument. @@ -243,7 +276,7 @@ func UnknownErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func InvalidArgumentErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf(format, a...)) + return NewError(connect.CodeInvalidArgument, fmt.Errorf(format, a...)) } // DeadlineExceededErrorf creates a new error that means operation expired before completion. @@ -255,7 +288,7 @@ func InvalidArgumentErrorf(format string, a ...interface{}) error { // The gRPC framework will generate this error code when the deadline is // exceeded. func DeadlineExceededErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf(format, a...)) + return NewError(connect.CodeDeadlineExceeded, fmt.Errorf(format, a...)) } // NotFoundErrorf creates a new error that means some requested entity (e.g., file or directory) was @@ -263,7 +296,7 @@ func DeadlineExceededErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func NotFoundErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeNotFound, fmt.Errorf(format, a...)) + return NewError(connect.CodeNotFound, fmt.Errorf(format, a...)) } // AlreadyExistsErrorf creates a new error that means an attempt to create an entity failed because one @@ -271,7 +304,7 @@ func NotFoundErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func AlreadyExistsErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeAlreadyExists, fmt.Errorf(format, a...)) + return NewError(connect.CodeAlreadyExists, fmt.Errorf(format, a...)) } // PermissionDeniedErrorf creates a new error that indicates the caller does not have permission to @@ -284,7 +317,7 @@ func AlreadyExistsErrorf(format string, a ...interface{}) error { // This error code will not be generated by the gRPC core framework, // but expect authentication middleware to use it. func PermissionDeniedErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodePermissionDenied, fmt.Errorf(format, a...)) + return NewError(connect.CodePermissionDenied, fmt.Errorf(format, a...)) } // ResourceExhaustedErrorf creates a new error that indicates some resource has been exhausted, perhaps @@ -294,7 +327,7 @@ func PermissionDeniedErrorf(format string, a ...interface{}) error { // out-of-memory and server overload situations, or when a message is // larger than the configured maximum size. func ResourceExhaustedErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeResourceExhausted, fmt.Errorf(format, a...)) + return NewError(connect.CodeResourceExhausted, fmt.Errorf(format, a...)) } // FailedPreconditionErrorf creates a new error that indicates operation was rejected because the @@ -320,7 +353,7 @@ func ResourceExhaustedErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func FailedPreconditionErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf(format, a...)) + return NewError(connect.CodeFailedPrecondition, fmt.Errorf(format, a...)) } // AbortedErrorf creates a new error that indicates the operation was aborted, typically due to a @@ -332,7 +365,7 @@ func FailedPreconditionErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func AbortedErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeAborted, fmt.Errorf(format, a...)) + return NewError(connect.CodeAborted, fmt.Errorf(format, a...)) } // OutOfRangeErrorf creates a new error that means operation was attempted past the valid range. @@ -353,7 +386,7 @@ func AbortedErrorf(format string, a ...interface{}) error { // // This error code will not be generated by the gRPC framework. func OutOfRangeErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeOutOfRange, fmt.Errorf(format, a...)) + return NewError(connect.CodeOutOfRange, fmt.Errorf(format, a...)) } // UnimplementedErrorf creates a new error that indicates operation is not implemented or not @@ -365,7 +398,7 @@ func OutOfRangeErrorf(format string, a ...interface{}) error { // compression algorithms or a disagreement as to whether an RPC should // be streaming. func UnimplementedErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeUnimplemented, fmt.Errorf(format, a...)) + return NewError(connect.CodeUnimplemented, fmt.Errorf(format, a...)) } // InternalErrorf creates a new error that errors. Means some invariants expected by underlying @@ -375,7 +408,7 @@ func UnimplementedErrorf(format string, a ...interface{}) error { // This error code will be generated by the gRPC framework in several // internal error conditions. func InternalErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeInternal, fmt.Errorf(format, a...)) + return NewError(connect.CodeInternal, fmt.Errorf(format, a...)) } // UnavailableErrorf creates a new error that indicates the service is currently unavailable. @@ -389,14 +422,14 @@ func InternalErrorf(format string, a ...interface{}) error { // This error code will be generated by the gRPC framework during // abrupt shutdown of a server process or network connection. func UnavailableErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeUnavailable, fmt.Errorf(format, a...)) + return NewError(connect.CodeUnavailable, fmt.Errorf(format, a...)) } // DataLossErrorf creates a new error that indicates unrecoverable data loss or corruption. // // This error code will not be generated by the gRPC framework. func DataLossErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeDataLoss, fmt.Errorf(format, a...)) + return NewError(connect.CodeDataLoss, fmt.Errorf(format, a...)) } // UnauthenticatedErrorf creates a new error that indicates the request does not have valid @@ -406,7 +439,15 @@ func DataLossErrorf(format string, a ...interface{}) error { // authentication metadata is invalid or a Credentials callback fails, // but also expect authentication middleware to generate it. func UnauthenticatedErrorf(format string, a ...interface{}) error { - return connect.NewError(connect.CodeUnauthenticated, fmt.Errorf(format, a...)) + return NewError(connect.CodeUnauthenticated, fmt.Errorf(format, a...)) +} + +func NewError(c connect.Code, underlying error) error { + e := connect.NewError(c, underlying) + + return Error{ + e: *e, + } } func ToHTTP(err error) int { diff --git a/pkg/errors/grpc_test.go b/pkg/errors/grpc_test.go new file mode 100644 index 000000000..296848a58 --- /dev/null +++ b/pkg/errors/grpc_test.go @@ -0,0 +1,33 @@ +package errors + +import ( + "testing" + + "connectrpc.com/connect" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Test_Grpc_Errors(t *testing.T) { + e := NotFoundErrorf("test") + require.True(t, IsNotFound(e)) + require.Equal(t, codes.NotFound, status.Code(e)) + require.Equal(t, connect.CodeNotFound, connect.CodeOf(e)) + require.Equal(t, "test", MessageOf(e)) + require.Equal(t, "not_found: test", e.Error()) + + connectErr := connect.NewError(connect.CodeNotFound, e) + require.Equal(t, codes.NotFound, status.Code(connectErr)) + require.True(t, IsNotFound(connectErr)) + require.Equal(t, connect.CodeNotFound, connect.CodeOf(connectErr)) + require.Equal(t, "not_found: test", MessageOf(connectErr)) + require.Equal(t, "not_found: not_found: test", connectErr.Error()) + + grpcError := status.Error(codes.NotFound, e.Error()) + require.Equal(t, codes.NotFound, status.Code(grpcError)) + require.True(t, IsNotFound(grpcError)) + require.Equal(t, "not_found: test", MessageOf(grpcError)) + require.Equal(t, "rpc error: code = NotFound desc = not_found: test", grpcError.Error()) + +} diff --git a/pkg/pipeline/capsule_request.go b/pkg/pipeline/capsule_request.go index d9392be49..6615f3472 100644 --- a/pkg/pipeline/capsule_request.go +++ b/pkg/pipeline/capsule_request.go @@ -17,6 +17,20 @@ import ( const ( LabelOwnedByCapsule = "rig.dev/owned-by-capsule" + + AnnotationOverrideOwnership = "rig.dev/override-ownership" + AnnotationPullSecret = "rig.dev/pull-secret" + + LabelSharedConfig = "rig.dev/shared-config" + LabelCapsule = "rig.dev/capsule" + LabelCron = "batch.kubernets.io/cronjob" + + RigDevRolloutLabel = "rig.dev/rollout" + + AnnotationChecksumFiles = "rig.dev/config-checksum-files" + AnnotationChecksumAutoEnv = "rig.dev/config-checksum-auto-env" + AnnotationChecksumEnv = "rig.dev/config-checksum-env" + AnnotationChecksumSharedEnv = "rig.dev/config-checksum-shared-env" ) // CapsuleRequest contains a single reconcile request for a given capsule. diff --git a/pkg/pipeline/helpers.go b/pkg/pipeline/helpers.go new file mode 100644 index 000000000..0e09b7106 --- /dev/null +++ b/pkg/pipeline/helpers.go @@ -0,0 +1,104 @@ +package pipeline + +import ( + "path" + "strings" + + "github.com/rigdev/rig/pkg/api/v1alpha2" + v1 "k8s.io/api/core/v1" +) + +var _defaultPodLabels = []string{RigDevRolloutLabel} + +func CreatePodAnnotations(req CapsuleRequest) map[string]string { + podAnnotations := map[string]string{} + for _, l := range _defaultPodLabels { + if v, ok := req.Capsule().Annotations[l]; ok { + podAnnotations[l] = v + } + } + return podAnnotations +} + +func FilesToVolumes(files []v1alpha2.File) ([]v1.Volume, []v1.VolumeMount) { + var volumes []v1.Volume + var mounts []v1.VolumeMount + for _, f := range files { + volume, mount := FileToVolume(f) + volumes = append(volumes, volume) + mounts = append(mounts, mount) + } + return volumes, mounts +} + +func FileToVolume(f v1alpha2.File) (v1.Volume, v1.VolumeMount) { + var volume v1.Volume + var mount v1.VolumeMount + var name string + switch f.Ref.Kind { + case "ConfigMap": + name = "configmap-" + strings.ReplaceAll(f.Ref.Name, ".", "-") + volume = v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: f.Ref.Name, + }, + Items: []v1.KeyToPath{ + { + Key: f.Ref.Key, + Path: path.Base(f.Path), + }, + }, + }, + }, + } + case "Secret": + name = "secret-" + strings.ReplaceAll(f.Ref.Name, ".", "-") + volume = v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: f.Ref.Name, + Items: []v1.KeyToPath{ + { + Key: f.Ref.Key, + Path: path.Base(f.Path), + }, + }, + }, + }, + } + } + if name != "" { + mount = v1.VolumeMount{ + Name: name, + MountPath: f.Path, + SubPath: path.Base(f.Path), + } + } + + return volume, mount +} + +func EnvSources(refs []v1alpha2.EnvReference) []v1.EnvFromSource { + var res []v1.EnvFromSource + for _, e := range refs { + switch e.Kind { + case "ConfigMap": + res = append(res, v1.EnvFromSource{ + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{Name: e.Name}, + }, + }) + case "Secret": + res = append(res, v1.EnvFromSource{ + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{Name: e.Name}, + }, + }) + } + } + return res +} diff --git a/pkg/service/pipeline/default_pipeline.go b/pkg/service/pipeline/default_pipeline.go index 48d5079d9..56458269f 100644 --- a/pkg/service/pipeline/default_pipeline.go +++ b/pkg/service/pipeline/default_pipeline.go @@ -7,12 +7,14 @@ import ( "github.com/rigdev/rig/pkg/api/config/v1alpha1" "github.com/rigdev/rig/pkg/controller/plugin" "github.com/rigdev/rig/pkg/pipeline" - "github.com/rigdev/rig/pkg/service/capabilities" + "github.com/rigdev/rig/plugins/cron_jobs" + "github.com/rigdev/rig/plugins/deployment" + "github.com/rigdev/rig/plugins/service_account" "k8s.io/apimachinery/pkg/runtime" ) func (s *service) initializePipeline(ctx context.Context) error { - p, err := CreateDefaultPipeline(ctx, s.client.Scheme(), s.capSvc, s.cfg, s.pluginManager, s.logger) + p, err := CreateDefaultPipeline(ctx, s.client.Scheme(), s.cfg, s.pluginManager, s.logger) if err != nil { return err } @@ -24,12 +26,11 @@ func (s *service) initializePipeline(ctx context.Context) error { func CreateDefaultPipeline( ctx context.Context, scheme *runtime.Scheme, - capSvc capabilities.Service, cfg *v1alpha1.OperatorConfig, pluginManager *plugin.Manager, logger logr.Logger, ) (*pipeline.CapsulePipeline, error) { - steps, err := GetDefaultPipelineSteps(ctx, capSvc, cfg, pluginManager, logger) + steps, err := GetDefaultPipelineSteps(ctx, cfg, pluginManager, logger) if err != nil { return nil, err } @@ -52,28 +53,49 @@ func CreateDefaultPipeline( } func GetDefaultPipelineSteps( - ctx context.Context, - capSvc capabilities.Service, + _ context.Context, cfg *v1alpha1.OperatorConfig, pluginManager *plugin.Manager, logger logr.Logger, ) ([]pipeline.Step[pipeline.CapsuleRequest], error) { - capabilities, err := capSvc.Get(ctx) + serviceAccountPlugin := service_account.Name + if cfg.Pipeline.ServiceAccountStep.Plugin != "" { + serviceAccountPlugin = cfg.Pipeline.ServiceAccountStep.Plugin + } + serviceAccountStep, err := NewCapsulePluginStep(serviceAccountPlugin, + cfg.Pipeline.ServiceAccountStep.Config, pluginManager, logger) if err != nil { return nil, err } - var steps []pipeline.Step[pipeline.CapsuleRequest] + deploymentPlugin := deployment.Name + if cfg.Pipeline.DeploymentStep.Plugin != "" { + deploymentPlugin = cfg.Pipeline.DeploymentStep.Plugin + } + deploymentStep, err := NewCapsulePluginStep(deploymentPlugin, + cfg.Pipeline.DeploymentStep.Config, pluginManager, logger) + if err != nil { + return nil, err + } - steps = append(steps, - NewServiceAccountStep(), - NewDeploymentStep(), - NewVPAStep(cfg), - NewNetworkStep(cfg), - ) + steps := []pipeline.Step[pipeline.CapsuleRequest]{ + serviceAccountStep, + deploymentStep, + } + + if cfg.Pipeline.VPAStep.Plugin != "" { + vpaStep, err := NewCapsulePluginStep(cfg.Pipeline.VPAStep.Plugin, + cfg.Pipeline.VPAStep.Config, pluginManager, logger) + if err != nil { + return nil, err + } + + steps = append(steps, vpaStep) + } if cfg.Pipeline.RoutesStep.Plugin != "" { - routesStep, err := NewRoutesStep(cfg, pluginManager, logger) + routesStep, err := NewCapsulePluginStep(cfg.Pipeline.RoutesStep.Plugin, + cfg.Pipeline.RoutesStep.Config, pluginManager, logger) if err != nil { return nil, err } @@ -81,27 +103,43 @@ func GetDefaultPipelineSteps( steps = append(steps, routesStep) } + cronJobsPlugin := cron_jobs.Name + if cfg.Pipeline.CronJobsStep.Plugin != "" { + cronJobsPlugin = cfg.Pipeline.CronJobsStep.Plugin + } + cronjobStep, err := NewCapsulePluginStep(cronJobsPlugin, + cfg.Pipeline.CronJobsStep.Config, pluginManager, logger) + if err != nil { + return nil, err + } steps = append(steps, - NewCronJobStep(), + cronjobStep, ) - if capabilities.GetHasPrometheusServiceMonitor() { - steps = append(steps, NewServiceMonitorStep(cfg)) + if cfg.Pipeline.ServiceMonitorStep.Plugin != "" { + serviceMonitorStep, err := NewCapsulePluginStep(cfg.Pipeline.ServiceMonitorStep.Plugin, + cfg.Pipeline.ServiceMonitorStep.Config, pluginManager, logger) + if err != nil { + return nil, err + } + + steps = append(steps, serviceMonitorStep) } return steps, nil } -func NewRoutesStep(cfg *v1alpha1.OperatorConfig, +func NewCapsulePluginStep( + pluginName, pluginConfig string, pluginManager *plugin.Manager, logger logr.Logger, ) (pipeline.Step[pipeline.CapsuleRequest], error) { - routesStep, err := pluginManager.NewStep(v1alpha1.Step{ + pluginStep, err := pluginManager.NewStep(v1alpha1.Step{ EnableForPlatform: true, Plugins: []v1alpha1.Plugin{ { - Name: cfg.Pipeline.RoutesStep.Plugin, - Config: cfg.Pipeline.RoutesStep.Config, + Name: pluginName, + Config: pluginConfig, }, }, }, logger) @@ -109,5 +147,5 @@ func NewRoutesStep(cfg *v1alpha1.OperatorConfig, return nil, err } - return routesStep, nil + return pluginStep, nil } diff --git a/pkg/service/pipeline/deployment_step_test.go b/pkg/service/pipeline/deployment_step_test.go deleted file mode 100644 index 167a0f147..000000000 --- a/pkg/service/pipeline/deployment_step_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package pipeline - -import ( - "context" - "testing" - - "github.com/go-logr/logr" - "github.com/rigdev/rig/pkg/api/config/v1alpha1" - "github.com/rigdev/rig/pkg/api/v1alpha2" - "github.com/rigdev/rig/pkg/pipeline" - "github.com/rigdev/rig/pkg/ptr" - "github.com/rigdev/rig/pkg/roclient" - "github.com/rigdev/rig/pkg/scheme" - "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func TestReusePodSelectors(t *testing.T) { - current := &appsv1.Deployment{ - TypeMeta: v1.TypeMeta{ - Kind: "Deployment", - APIVersion: "apps/v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "foobar", - Namespace: "my-ns", - }, - Spec: appsv1.DeploymentSpec{ - Selector: &v1.LabelSelector{ - MatchLabels: map[string]string{ - "app.kubernetes.io/name": "foobar", - }, - }, - }, - } - - r := roclient.NewReader(scheme.New()) - require.NoError(t, r.AddObject(current)) - - p := pipeline.NewCapsulePipeline(&v1alpha1.OperatorConfig{}, scheme.New(), logr.Discard()) - p.AddStep(NewDeploymentStep()) - c := &v1alpha2.Capsule{ObjectMeta: v1.ObjectMeta{ - Name: "foobar", - Namespace: "my-ns", - }, Status: &v1alpha2.CapsuleStatus{ - OwnedResources: []v1alpha2.OwnedResource{ - { - Ref: &corev1.TypedLocalObjectReference{ - APIGroup: ptr.New(current.GetObjectKind().GroupVersionKind().Group), - Kind: current.Kind, - Name: current.Name, - }, - }, - }, - }} - res, err := p.RunCapsule(context.Background(), c, testClient{r: r}) - require.NoError(t, err) - for _, o := range res.OutputObjects { - if dep, ok := o.Object.(*appsv1.Deployment); ok { - require.Equal(t, map[string]string{ - "app.kubernetes.io/name": "foobar", - }, dep.Spec.Selector.MatchLabels) - } - } -} - -type testClient struct { - r client.Reader - client.Client -} - -func (c testClient) Get( - ctx context.Context, - key types.NamespacedName, - obj client.Object, - opts ...client.GetOption, -) error { - return c.r.Get(ctx, key, obj, opts...) -} - -func (c testClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - return c.r.List(ctx, list, opts...) -} diff --git a/pkg/service/pipeline/network_step.go b/pkg/service/pipeline/network_step.go deleted file mode 100644 index b39904abc..000000000 --- a/pkg/service/pipeline/network_step.go +++ /dev/null @@ -1,160 +0,0 @@ -package pipeline - -import ( - "context" - "fmt" - - "github.com/rigdev/rig/pkg/api/config/v1alpha1" - "github.com/rigdev/rig/pkg/errors" - "github.com/rigdev/rig/pkg/pipeline" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" -) - -type NetworkStep struct { - cfg *v1alpha1.OperatorConfig -} - -func NewNetworkStep(cfg *v1alpha1.OperatorConfig) *NetworkStep { - return &NetworkStep{ - cfg: cfg, - } -} - -func (s *NetworkStep) Apply(_ context.Context, req pipeline.CapsuleRequest) error { - // If no interfaces are defined, no changes are needed. - if len(req.Capsule().Spec.Interfaces) == 0 { - return nil - } - - deployment := &appsv1.Deployment{} - if err := req.GetNew(deployment); errors.IsNotFound(err) { - // We assume service and ingress are not needed if the deployment doesn't exist. - return nil - } else if err != nil { - return err - } - - for i, container := range deployment.Spec.Template.Spec.Containers { - if container.Name != req.Capsule().Name { - continue - } - - var ports []corev1.ContainerPort - for _, ni := range req.Capsule().Spec.Interfaces { - ports = append(ports, corev1.ContainerPort{ - Name: ni.Name, - ContainerPort: ni.Port, - }) - - if ni.Liveness != nil { - container.LivenessProbe = &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: ni.Liveness.Path, - Port: intstr.FromInt32(ni.Port), - }, - }, - } - } - - if ni.Readiness != nil { - container.ReadinessProbe = &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: ni.Readiness.Path, - Port: intstr.FromInt32(ni.Port), - }, - }, - } - } - } - container.Ports = ports - deployment.Spec.Template.Spec.Containers[i] = container - } - - if err := req.Set(deployment); err != nil { - return err - } - - if err := req.Set(s.createService(req)); err != nil { - return err - } - - if capsuleHasLoadBalancer(req) { - lb := s.createLoadBalancer(req) - if err := req.Set(lb); err != nil { - return err - } - } - - return nil -} - -func (s *NetworkStep) createService(req pipeline.CapsuleRequest) *corev1.Service { - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: req.Capsule().Name, - Namespace: req.Capsule().Namespace, - Labels: map[string]string{ - LabelCapsule: req.Capsule().Name, - }, - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - LabelCapsule: req.Capsule().Name, - }, - }, - } - - for _, inf := range req.Capsule().Spec.Interfaces { - svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{ - Name: inf.Name, - Port: inf.Port, - TargetPort: intstr.FromString(inf.Name), - }) - } - - return svc -} - -func (s *NetworkStep) createLoadBalancer(req pipeline.CapsuleRequest) *corev1.Service { - svc := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-lb", req.Capsule().Name), - Namespace: req.Capsule().Namespace, - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeLoadBalancer, - Selector: map[string]string{ - LabelCapsule: req.Capsule().Name, - }, - }, - } - - for _, inf := range req.Capsule().Spec.Interfaces { - if inf.Public != nil && inf.Public.LoadBalancer != nil { - svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{ - Name: inf.Name, - Port: inf.Public.LoadBalancer.Port, - TargetPort: intstr.FromString(inf.Name), - }) - } - } - - return svc -} - -func capsuleHasLoadBalancer(req pipeline.CapsuleRequest) bool { - for _, inf := range req.Capsule().Spec.Interfaces { - if inf.Public != nil && inf.Public.LoadBalancer != nil { - return true - } - } - return false -} diff --git a/pkg/service/pipeline/service.go b/pkg/service/pipeline/service.go index d0c0427d1..8535cfb35 100644 --- a/pkg/service/pipeline/service.go +++ b/pkg/service/pipeline/service.go @@ -87,7 +87,7 @@ func (s *service) DryRun( spec.SetUID(types.UID("dry-run-spec")) } - steps, err := GetDefaultPipelineSteps(ctx, s.capSvc, cfg, s.pluginManager, s.logger) + steps, err := GetDefaultPipelineSteps(ctx, cfg, s.pluginManager, s.logger) if err != nil { return nil, err } diff --git a/pkg/service/pipeline/service_account_step.go b/pkg/service/pipeline/service_account_step.go deleted file mode 100644 index 580c2e7c5..000000000 --- a/pkg/service/pipeline/service_account_step.go +++ /dev/null @@ -1,35 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/rigdev/rig/pkg/pipeline" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -type ServiceAccountStep struct{} - -func NewServiceAccountStep() *ServiceAccountStep { - return &ServiceAccountStep{} -} - -func (s *ServiceAccountStep) Apply(_ context.Context, req pipeline.CapsuleRequest) error { - sa := s.createServiceAccount(req) - return req.Set(sa) -} - -func (s *ServiceAccountStep) createServiceAccount(req pipeline.CapsuleRequest) *corev1.ServiceAccount { - sa := &corev1.ServiceAccount{ - TypeMeta: metav1.TypeMeta{ - Kind: "ServiceAccount", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: req.Capsule().Name, - Namespace: req.Capsule().Namespace, - }, - } - - return sa -} diff --git a/pkg/service/pipeline/service_monitor_step.go b/pkg/service/pipeline/service_monitor_step.go deleted file mode 100644 index 5c9811e53..000000000 --- a/pkg/service/pipeline/service_monitor_step.go +++ /dev/null @@ -1,71 +0,0 @@ -package pipeline - -import ( - "context" - - monitorv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - "github.com/rigdev/rig/pkg/api/config/v1alpha1" - "github.com/rigdev/rig/pkg/pipeline" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - AnnotationChecksumFiles = "rig.dev/config-checksum-files" - AnnotationChecksumAutoEnv = "rig.dev/config-checksum-auto-env" - AnnotationChecksumEnv = "rig.dev/config-checksum-env" - AnnotationChecksumSharedEnv = "rig.dev/config-checksum-shared-env" - - AnnotationOverrideOwnership = "rig.dev/override-ownership" - AnnotationPullSecret = "rig.dev/pull-secret" - - LabelSharedConfig = "rig.dev/shared-config" - LabelCapsule = "rig.dev/capsule" - LabelCron = "batch.kubernets.io/cronjob" -) - -type ServiceMonitorStep struct { - cfg *v1alpha1.OperatorConfig -} - -func NewServiceMonitorStep(cfg *v1alpha1.OperatorConfig) *ServiceMonitorStep { - return &ServiceMonitorStep{ - cfg: cfg, - } -} - -func (s *ServiceMonitorStep) Apply(_ context.Context, req pipeline.CapsuleRequest) error { - if s.cfg.PrometheusServiceMonitor == nil || s.cfg.PrometheusServiceMonitor.PortName == "" { - return nil - } - - serviceMonitor := s.createPrometheusServiceMonitor(req) - return req.Set(serviceMonitor) -} - -func (s *ServiceMonitorStep) createPrometheusServiceMonitor(req pipeline.CapsuleRequest) *monitorv1.ServiceMonitor { - return &monitorv1.ServiceMonitor{ - TypeMeta: metav1.TypeMeta{ - Kind: "ServiceMonitor", - APIVersion: "monitoring.coreos.com/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: req.Capsule().Name, - Namespace: req.Capsule().Namespace, - ResourceVersion: "", - Labels: map[string]string{ - LabelCapsule: req.Capsule().Name, - }, - }, - Spec: monitorv1.ServiceMonitorSpec{ - Selector: metav1.LabelSelector{ - MatchLabels: map[string]string{ - LabelCapsule: req.Capsule().Name, - }, - }, - Endpoints: []monitorv1.Endpoint{{ - Port: s.cfg.PrometheusServiceMonitor.PortName, - Path: s.cfg.PrometheusServiceMonitor.Path, - }}, - }, - } -} diff --git a/plugins/allplugins/all.go b/plugins/allplugins/all.go index 3ba92b89d..7e8b3a80f 100644 --- a/plugins/allplugins/all.go +++ b/plugins/allplugins/all.go @@ -3,24 +3,34 @@ package allplugins import ( "github.com/rigdev/rig/pkg/controller/plugin" "github.com/rigdev/rig/plugins/annotations" + "github.com/rigdev/rig/plugins/cron_jobs" "github.com/rigdev/rig/plugins/datadog" + "github.com/rigdev/rig/plugins/deployment" envmapping "github.com/rigdev/rig/plugins/env_mapping" googlesqlproxy "github.com/rigdev/rig/plugins/google_cloud_sql_auth_proxy" ingressroutes "github.com/rigdev/rig/plugins/ingress_routes" initcontainer "github.com/rigdev/rig/plugins/init_container" objecttemplate "github.com/rigdev/rig/plugins/object_template" "github.com/rigdev/rig/plugins/placement" + "github.com/rigdev/rig/plugins/service_account" + "github.com/rigdev/rig/plugins/service_monitor" "github.com/rigdev/rig/plugins/sidecar" + "github.com/rigdev/rig/plugins/vpa" ) var Plugins = map[string]plugin.Plugin{ - annotations.Name: &annotations.Plugin{}, - datadog.Name: &datadog.Plugin{}, - envmapping.Name: &envmapping.Plugin{}, - googlesqlproxy.Name: &googlesqlproxy.Plugin{}, - initcontainer.Name: &initcontainer.Plugin{}, - objecttemplate.Name: &objecttemplate.Plugin{}, - placement.Name: &placement.Plugin{}, - sidecar.Name: &sidecar.Plugin{}, - ingressroutes.Name: &ingressroutes.Plugin{}, + annotations.Name: &annotations.Plugin{}, + datadog.Name: &datadog.Plugin{}, + envmapping.Name: &envmapping.Plugin{}, + googlesqlproxy.Name: &googlesqlproxy.Plugin{}, + initcontainer.Name: &initcontainer.Plugin{}, + objecttemplate.Name: &objecttemplate.Plugin{}, + placement.Name: &placement.Plugin{}, + sidecar.Name: &sidecar.Plugin{}, + ingressroutes.Name: &ingressroutes.Plugin{}, + deployment.Name: &deployment.Plugin{}, + cron_jobs.Name: &cron_jobs.Plugin{}, + service_account.Name: &service_account.Plugin{}, + service_monitor.Name: &service_monitor.Plugin{}, + vpa.Name: &vpa.Plugin{}, } diff --git a/plugins/cron_jobs/README.md b/plugins/cron_jobs/README.md new file mode 100644 index 000000000..ebeee2255 --- /dev/null +++ b/plugins/cron_jobs/README.md @@ -0,0 +1,13 @@ +# Cron Jobs Plugin + +The `rigdev.cron_jobs` plugin is the default plugin for handling the jobs specified in the `capsule spec` in the reconcilliation pipeline. For each job specified in the capsule spec, if the job is specified by a command, the plugin will create a cron job based on the container of the capsule deployment. Alternatively, if the job is specified by a URL, the plugin will create a cron job that will curl the URL. + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/pkg/service/pipeline/cron_jobs_step.go b/plugins/cron_jobs/plugin.go similarity index 68% rename from pkg/service/pipeline/cron_jobs_step.go rename to plugins/cron_jobs/plugin.go index 226e4329f..a379c5801 100644 --- a/pkg/service/pipeline/cron_jobs_step.go +++ b/plugins/cron_jobs/plugin.go @@ -1,10 +1,15 @@ -package pipeline +// +groupName=plugins.rig.dev -- Only used for config doc generation +// +//nolint:revive +package cron_jobs import ( "context" "fmt" "net/url" + "github.com/hashicorp/go-hclog" + "github.com/rigdev/rig/pkg/controller/plugin" "github.com/rigdev/rig/pkg/errors" "github.com/rigdev/rig/pkg/pipeline" "github.com/rigdev/rig/pkg/ptr" @@ -14,14 +19,36 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type CronJobStep struct{} +const ( + Name = "rigdev.cron_jobs" +) + +// Configuration for the deployment plugin +// +kubebuilder:object:root=true +type Config struct { +} -func NewCronJobStep() *CronJobStep { - return &CronJobStep{} +type Plugin struct { + configBytes []byte +} + +func (p *Plugin) Initialize(req plugin.InitializeRequest) error { + p.configBytes = req.Config + return nil } -func (s *CronJobStep) Apply(_ context.Context, req pipeline.CapsuleRequest) error { - jobs, err := s.createCronJobs(req) +func (p *Plugin) Run(ctx context.Context, req pipeline.CapsuleRequest, logger hclog.Logger) error { + // We do not have any configuration for this step? + // var config Config + var err error + if len(p.configBytes) > 0 { + _, err = plugin.ParseTemplatedConfig[Config](p.configBytes, req, plugin.CapsuleStep[Config]) + if err != nil { + return err + } + } + + jobs, err := p.createCronJobs(req) if err != nil { return err } @@ -35,7 +62,7 @@ func (s *CronJobStep) Apply(_ context.Context, req pipeline.CapsuleRequest) erro return nil } -func (s *CronJobStep) createCronJobs(req pipeline.CapsuleRequest) ([]*batchv1.CronJob, error) { +func (p *Plugin) createCronJobs(req pipeline.CapsuleRequest) ([]*batchv1.CronJob, error) { var res []*batchv1.CronJob deployment := &appsv1.Deployment{} if err := req.GetNew(deployment); errors.IsNotFound(err) { @@ -77,19 +104,15 @@ func (s *CronJobStep) createCronJobs(req pipeline.CapsuleRequest) ([]*batchv1.Cr return nil, fmt.Errorf("neither Command nor URL was set on job %s", job.Name) } - annotations := createPodAnnotations(req) + annotations := pipeline.CreatePodAnnotations(req) j := &batchv1.CronJob{ - TypeMeta: metav1.TypeMeta{ - Kind: "batch/v1", - APIVersion: "CronJob", - }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", req.Capsule().Name, job.Name), Namespace: req.Capsule().Namespace, Labels: map[string]string{ - LabelCapsule: req.Capsule().Name, - LabelCron: job.Name, + pipeline.LabelCapsule: req.Capsule().Name, + pipeline.LabelCron: job.Name, }, Annotations: annotations, }, @@ -99,8 +122,8 @@ func (s *CronJobStep) createCronJobs(req pipeline.CapsuleRequest) ([]*batchv1.Cr ObjectMeta: metav1.ObjectMeta{ Annotations: annotations, Labels: map[string]string{ - LabelCapsule: req.Capsule().Name, - LabelCron: job.Name, + pipeline.LabelCapsule: req.Capsule().Name, + pipeline.LabelCron: job.Name, }, }, Spec: batchv1.JobSpec{ diff --git a/plugins/deployment/README.md b/plugins/deployment/README.md new file mode 100644 index 000000000..9a2689350 --- /dev/null +++ b/plugins/deployment/README.md @@ -0,0 +1,17 @@ +# Deployment Plugin + +Default plugin for handling deployments in the reconcilliation pipeline. Another plugin can be specified in the `deploymentStep` in the pipeline in the operator config. +The `rigdev.deployment` plugin will create a deployment for the capsule, and a service if the the capsule has interfaces defined. + + + + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/pkg/service/pipeline/deployment_step.go b/plugins/deployment/plugin.go similarity index 74% rename from pkg/service/pipeline/deployment_step.go rename to plugins/deployment/plugin.go index b47380680..f1400fbb7 100644 --- a/pkg/service/pipeline/deployment_step.go +++ b/plugins/deployment/plugin.go @@ -1,14 +1,17 @@ -package pipeline +// +groupName=plugins.rig.dev -- Only used for config doc generation +// +//nolint:revive +package deployment import ( "context" "crypto/sha256" "fmt" - "path" "slices" - "strings" + "github.com/hashicorp/go-hclog" "github.com/rigdev/rig/pkg/api/v1alpha2" + "github.com/rigdev/rig/pkg/controller/plugin" "github.com/rigdev/rig/pkg/errors" "github.com/rigdev/rig/pkg/hash" "github.com/rigdev/rig/pkg/pipeline" @@ -23,28 +26,48 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( - RigDevRolloutLabel = "rig.dev/rollout" + Name = "rigdev.deployment" ) -var _defaultPodLabels = []string{RigDevRolloutLabel} +// Configuration for the deployment plugin +// +kubebuilder:object:root=true +type Config struct { +} -type DeploymentStep struct{} +type Plugin struct { + reader client.Reader + configBytes []byte +} -func NewDeploymentStep() *DeploymentStep { - return &DeploymentStep{} +func (p *Plugin) Initialize(req plugin.InitializeRequest) error { + p.reader = req.Reader + p.configBytes = req.Config + return nil } -func (s *DeploymentStep) Apply(ctx context.Context, req pipeline.CapsuleRequest) error { - cfgs, err := s.getConfigs(ctx, req) +func (p *Plugin) Run(ctx context.Context, req pipeline.CapsuleRequest, logger hclog.Logger) error { + // We do not have any configuration for this step? + + // var config Config + var err error + if len(p.configBytes) > 0 { + _, err = plugin.ParseTemplatedConfig[Config](p.configBytes, req, plugin.CapsuleStep[Config]) + if err != nil { + return err + } + } + + cfgs, err := p.getConfigs(ctx, req) if err != nil { return err } - checksums, err := s.getConfigChecksums(req, *cfgs) + checksums, err := p.getConfigChecksums(req, *cfgs) if err != nil { return err } @@ -56,19 +79,29 @@ func (s *DeploymentStep) Apply(ctx context.Context, req pipeline.CapsuleRequest) return err } - deployment, err := s.createDeployment(current, req, cfgs, checksums) + deployment, err := p.createDeployment(current, req, cfgs, checksums) if err != nil { return err } + if len(req.Capsule().Spec.Interfaces) > 0 { + if err := p.handleInterfaces(req, deployment); err != nil { + return err + } + + if err := req.Set(p.createService(req)); err != nil { + return err + } + } + if err := req.Set(deployment); err != nil { return err } - if ok, err := s.shouldCreateHPA(req); err != nil { + if ok, err := p.shouldCreateHPA(req); err != nil { return err } else if ok { - hpa, _, err := s.createHPA(req) + hpa, _, err := p.createHPA(req) if err != nil { return err } @@ -81,113 +114,30 @@ func (s *DeploymentStep) Apply(ctx context.Context, req pipeline.CapsuleRequest) return nil } -func FileToVolume(f v1alpha2.File) (v1.Volume, v1.VolumeMount) { - var volume v1.Volume - var mount v1.VolumeMount - var name string - switch f.Ref.Kind { - case "ConfigMap": - name = "configmap-" + strings.ReplaceAll(f.Ref.Name, ".", "-") - volume = v1.Volume{ - Name: name, - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: f.Ref.Name, - }, - Items: []v1.KeyToPath{ - { - Key: f.Ref.Key, - Path: path.Base(f.Path), - }, - }, - }, - }, - } - case "Secret": - name = "secret-" + strings.ReplaceAll(f.Ref.Name, ".", "-") - volume = v1.Volume{ - Name: name, - VolumeSource: v1.VolumeSource{ - Secret: &v1.SecretVolumeSource{ - SecretName: f.Ref.Name, - Items: []v1.KeyToPath{ - { - Key: f.Ref.Key, - Path: path.Base(f.Path), - }, - }, - }, - }, - } - } - if name != "" { - mount = v1.VolumeMount{ - Name: name, - MountPath: f.Path, - SubPath: path.Base(f.Path), - } - } - - return volume, mount -} - -func FilesToVolumes(files []v1alpha2.File) ([]v1.Volume, []v1.VolumeMount) { - var volumes []v1.Volume - var mounts []v1.VolumeMount - for _, f := range files { - volume, mount := FileToVolume(f) - volumes = append(volumes, volume) - mounts = append(mounts, mount) - } - return volumes, mounts -} - -func EnvSources(refs []v1alpha2.EnvReference) []v1.EnvFromSource { - var res []v1.EnvFromSource - for _, e := range refs { - switch e.Kind { - case "ConfigMap": - res = append(res, v1.EnvFromSource{ - ConfigMapRef: &v1.ConfigMapEnvSource{ - LocalObjectReference: v1.LocalObjectReference{Name: e.Name}, - }, - }) - case "Secret": - res = append(res, v1.EnvFromSource{ - SecretRef: &v1.SecretEnvSource{ - LocalObjectReference: v1.LocalObjectReference{Name: e.Name}, - }, - }) - } - } - return res -} - -func (s *DeploymentStep) createDeployment( +func (p *Plugin) createDeployment( current *appsv1.Deployment, req pipeline.CapsuleRequest, cfgs *configs, checksums checksums, ) (*appsv1.Deployment, error) { - volumes, volumeMounts := FilesToVolumes(req.Capsule().Spec.Files) + volumes, volumeMounts := pipeline.FilesToVolumes(req.Capsule().Spec.Files) - podAnnotations := createPodAnnotations(req) + podAnnotations := pipeline.CreatePodAnnotations(req) if checksums.files != "" { - podAnnotations[AnnotationChecksumFiles] = checksums.files + podAnnotations[pipeline.AnnotationChecksumFiles] = checksums.files } if checksums.autoEnv != "" { - podAnnotations[AnnotationChecksumAutoEnv] = checksums.autoEnv + podAnnotations[pipeline.AnnotationChecksumAutoEnv] = checksums.autoEnv } if checksums.env != "" { - podAnnotations[AnnotationChecksumEnv] = checksums.env + podAnnotations[pipeline.AnnotationChecksumEnv] = checksums.env } if checksums.sharedEnv != "" { - podAnnotations[AnnotationChecksumSharedEnv] = checksums.sharedEnv + podAnnotations[pipeline.AnnotationChecksumSharedEnv] = checksums.sharedEnv } env := req.Capsule().Spec.Env - envFrom := EnvSources(env.From) + envFrom := pipeline.EnvSources(env.From) if !env.DisableAutomatic { if _, ok := cfgs.configMaps[req.Capsule().GetName()]; ok { envFrom = append(envFrom, v1.EnvFromSource{ @@ -240,7 +190,7 @@ func (s *DeploymentStep) createDeployment( } replicas := ptr.New(int32(req.Capsule().Spec.Scale.Horizontal.Instances.Min)) - hasHPA, err := s.shouldCreateHPA(req) + hasHPA, err := p.shouldCreateHPA(req) if err != nil { return nil, err } @@ -261,13 +211,13 @@ func (s *DeploymentStep) createDeployment( }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: s.getPodsSelector(current, req), + MatchLabels: p.getPodsSelector(current, req), }, Replicas: replicas, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: podAnnotations, - Labels: s.getPodLabels(current, req), + Labels: p.getPodLabels(current, req), }, Spec: v1.PodSpec{ Containers: []v1.Container{c}, @@ -288,24 +238,83 @@ func (s *DeploymentStep) createDeployment( return d, nil } -func createPodAnnotations(req pipeline.CapsuleRequest) map[string]string { - podAnnotations := map[string]string{} - for _, l := range _defaultPodLabels { - if v, ok := req.Capsule().Annotations[l]; ok { - podAnnotations[l] = v +func (p *Plugin) handleInterfaces(req pipeline.CapsuleRequest, deployment *appsv1.Deployment) error { + for i, container := range deployment.Spec.Template.Spec.Containers { + if container.Name != req.Capsule().Name { + continue + } + + var ports []v1.ContainerPort + for _, ni := range req.Capsule().Spec.Interfaces { + ports = append(ports, v1.ContainerPort{ + Name: ni.Name, + ContainerPort: ni.Port, + }) + + if ni.Liveness != nil { + container.LivenessProbe = &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: ni.Liveness.Path, + Port: intstr.FromInt32(ni.Port), + }, + }, + } + } + + if ni.Readiness != nil { + container.ReadinessProbe = &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: ni.Readiness.Path, + Port: intstr.FromInt32(ni.Port), + }, + }, + } + } } + container.Ports = ports + deployment.Spec.Template.Spec.Containers[i] = container } - return podAnnotations + + return nil +} + +func (p *Plugin) createService(req pipeline.CapsuleRequest) *v1.Service { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: req.Capsule().Name, + Namespace: req.Capsule().Namespace, + Labels: map[string]string{ + pipeline.LabelCapsule: req.Capsule().Name, + }, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + pipeline.LabelCapsule: req.Capsule().Name, + }, + }, + } + + for _, inf := range req.Capsule().Spec.Interfaces { + svc.Spec.Ports = append(svc.Spec.Ports, v1.ServicePort{ + Name: inf.Name, + Port: inf.Port, + TargetPort: intstr.FromString(inf.Name), + }) + } + + return svc } -func (s *DeploymentStep) getPodLabels(current *appsv1.Deployment, req pipeline.CapsuleRequest) map[string]string { +func (p *Plugin) getPodLabels(current *appsv1.Deployment, req pipeline.CapsuleRequest) map[string]string { labels := map[string]string{} - maps.Copy(labels, s.getPodsSelector(current, req)) - labels[LabelCapsule] = req.Capsule().Name + maps.Copy(labels, p.getPodsSelector(current, req)) + labels[pipeline.LabelCapsule] = req.Capsule().Name return labels } -func (s *DeploymentStep) getPodsSelector(current *appsv1.Deployment, req pipeline.CapsuleRequest) map[string]string { +func (p *Plugin) getPodsSelector(current *appsv1.Deployment, req pipeline.CapsuleRequest) map[string]string { if current != nil { if s := current.Spec.Selector; s != nil { if len(s.MatchLabels) > 0 && len(s.MatchExpressions) == 0 { @@ -315,11 +324,11 @@ func (s *DeploymentStep) getPodsSelector(current *appsv1.Deployment, req pipelin } return map[string]string{ - LabelCapsule: req.Capsule().Name, + pipeline.LabelCapsule: req.Capsule().Name, } } -func (s *DeploymentStep) getConfigChecksums(req pipeline.CapsuleRequest, cfgs configs) (checksums, error) { +func (p *Plugin) getConfigChecksums(req pipeline.CapsuleRequest, cfgs configs) (checksums, error) { sharedEnv, err := configSharedEnvChecksum(cfgs) if err != nil { return checksums{}, err @@ -478,7 +487,7 @@ func configFilesChecksum(req pipeline.CapsuleRequest, cfgs configs) (string, err return fmt.Sprintf("%x", h.Sum(nil)), nil } -func (s *DeploymentStep) getConfigs(ctx context.Context, req pipeline.CapsuleRequest) (*configs, error) { +func (p *Plugin) getConfigs(ctx context.Context, req pipeline.CapsuleRequest) (*configs, error) { configs := &configs{ configMaps: map[string]*v1.ConfigMap{}, secrets: map[string]*v1.Secret{}, @@ -486,10 +495,10 @@ func (s *DeploymentStep) getConfigs(ctx context.Context, req pipeline.CapsuleReq // Get shared env var configMapList v1.ConfigMapList - if err := req.Reader().List(ctx, &configMapList, &client.ListOptions{ + if err := p.reader.List(ctx, &configMapList, &client.ListOptions{ Namespace: req.Capsule().Namespace, LabelSelector: labels.SelectorFromSet(labels.Set{ - LabelSharedConfig: "true", + pipeline.LabelSharedConfig: "true", }), }); err != nil { return nil, fmt.Errorf("could not list shared env configmaps: %w", err) @@ -500,10 +509,10 @@ func (s *DeploymentStep) getConfigs(ctx context.Context, req pipeline.CapsuleReq configs.configMaps[cm.Name] = &cm } var secretList v1.SecretList - if err := req.Reader().List(ctx, &secretList, &client.ListOptions{ + if err := p.reader.List(ctx, &secretList, &client.ListOptions{ Namespace: req.Capsule().Namespace, LabelSelector: labels.SelectorFromSet(labels.Set{ - LabelSharedConfig: "true", + pipeline.LabelSharedConfig: "true", }), }); err != nil { return nil, fmt.Errorf("could not list shared env secrets: %w", err) @@ -518,32 +527,32 @@ func (s *DeploymentStep) getConfigs(ctx context.Context, req pipeline.CapsuleReq // Get automatic env if !env.DisableAutomatic { - if err := s.setUsedSource(ctx, req, configs, "ConfigMap", req.Capsule().Name, false); err != nil { + if err := p.setUsedSource(ctx, req, configs, "ConfigMap", req.Capsule().Name, false); err != nil { return nil, err } - if err := s.setUsedSource(ctx, req, configs, "Secret", req.Capsule().Name, false); err != nil { + if err := p.setUsedSource(ctx, req, configs, "Secret", req.Capsule().Name, false); err != nil { return nil, err } } // Get envs for _, e := range env.From { - if err := s.setUsedSource(ctx, req, configs, e.Kind, e.Name, true); err != nil { + if err := p.setUsedSource(ctx, req, configs, e.Kind, e.Name, true); err != nil { return nil, err } } // Get files for _, f := range req.Capsule().Spec.Files { - if err := s.setUsedSource(ctx, req, configs, f.Ref.Kind, f.Ref.Name, true); err != nil { + if err := p.setUsedSource(ctx, req, configs, f.Ref.Kind, f.Ref.Name, true); err != nil { return nil, err } } - if secret := req.Capsule().Annotations[AnnotationPullSecret]; secret != "" { + if secret := req.Capsule().Annotations[pipeline.AnnotationPullSecret]; secret != "" { configs.imagePullSecret = secret - if err := s.setUsedSource(ctx, req, configs, "Secret", secret, true); err != nil { + if err := p.setUsedSource(ctx, req, configs, "Secret", secret, true); err != nil { return nil, err } } @@ -551,7 +560,7 @@ func (s *DeploymentStep) getConfigs(ctx context.Context, req pipeline.CapsuleReq return configs, nil } -func (s *DeploymentStep) setUsedSource( +func (p *Plugin) setUsedSource( ctx context.Context, req pipeline.CapsuleRequest, cfgs *configs, @@ -586,7 +595,7 @@ func (s *DeploymentStep) setUsedSource( return nil } var cm v1.ConfigMap - if err := req.Reader().Get(ctx, types.NamespacedName{ + if err := p.reader.Get(ctx, types.NamespacedName{ Name: name, Namespace: req.Capsule().Namespace, }, &cm); err != nil { @@ -599,7 +608,7 @@ func (s *DeploymentStep) setUsedSource( return nil } var s v1.Secret - if err := req.Reader().Get(ctx, types.NamespacedName{ + if err := p.reader.Get(ctx, types.NamespacedName{ Name: name, Namespace: req.Capsule().Namespace, }, &s); err != nil { @@ -611,15 +620,15 @@ func (s *DeploymentStep) setUsedSource( return nil } -func (s *DeploymentStep) shouldCreateHPA(req pipeline.CapsuleRequest) (bool, error) { - _, res, err := s.createHPA(req) +func (p *Plugin) shouldCreateHPA(req pipeline.CapsuleRequest) (bool, error) { + _, res, err := p.createHPA(req) if err != nil { return false, err } return res, nil } -func (s *DeploymentStep) createHPA(req pipeline.CapsuleRequest) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) { +func (p *Plugin) createHPA(req pipeline.CapsuleRequest) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) { hpa := &autoscalingv2.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ Name: req.Capsule().Name, diff --git a/plugins/google_cloud_sql_auth_proxy/plugin.go b/plugins/google_cloud_sql_auth_proxy/plugin.go index 713a9c384..a44ec9436 100644 --- a/plugins/google_cloud_sql_auth_proxy/plugin.go +++ b/plugins/google_cloud_sql_auth_proxy/plugin.go @@ -11,7 +11,6 @@ import ( "github.com/rigdev/rig/pkg/errors" "github.com/rigdev/rig/pkg/pipeline" "github.com/rigdev/rig/pkg/ptr" - svc_pipeline "github.com/rigdev/rig/pkg/service/pipeline" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -116,7 +115,7 @@ func (p *Plugin) Run(_ context.Context, req pipeline.CapsuleRequest, _ hclog.Log resources[corev1.ResourceMemory] = memory } - volume, mounts := svc_pipeline.FilesToVolumes(config.Files) + volume, mounts := pipeline.FilesToVolumes(config.Files) for _, v := range volume { for _, vv := range deployment.Spec.Template.Spec.Volumes { found := false @@ -134,7 +133,7 @@ func (p *Plugin) Run(_ context.Context, req pipeline.CapsuleRequest, _ hclog.Log Name: "google-cloud-sql-proxy", Image: image, Args: args, - EnvFrom: svc_pipeline.EnvSources(config.EnvFromSource), + EnvFrom: pipeline.EnvSources(config.EnvFromSource), Env: config.EnvVars, Resources: corev1.ResourceRequirements{ Requests: resources, diff --git a/plugins/ingress_routes/README.md b/plugins/ingress_routes/README.md index 93388f174..5d040a375 100644 --- a/plugins/ingress_routes/README.md +++ b/plugins/ingress_routes/README.md @@ -10,14 +10,13 @@ Config (in context of the rig-operator Helm values): ``` config: pipeline: - steps: - - plugins: - - name: rigdev.ingress_routes - config: | - clusterIssuer: letsencrypt-prod - createCertificateResources: true - ingressClassName: nginx - disableTLS: false + routesStep: + plugin: "rigdev.ingress_routes" + config: | + clusterIssuer: letsencrypt-prod + createCertificateResources: true + ingressClassName: nginx + disableTLS: false ``` ## Config diff --git a/plugins/service_account/README.md b/plugins/service_account/README.md new file mode 100644 index 000000000..154a48f03 --- /dev/null +++ b/plugins/service_account/README.md @@ -0,0 +1,12 @@ +# Service Account Plugin +The `rigdev.service_account` plugin provides the default way of handling service accounts in the reconcilliation pipeline. It will create a service account with the name and namespace of the capsule. + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/plugins/service_account/plugin.go b/plugins/service_account/plugin.go new file mode 100644 index 000000000..209256f3f --- /dev/null +++ b/plugins/service_account/plugin.go @@ -0,0 +1,62 @@ +// +groupName=plugins.rig.dev -- Only used for config doc generation +// +//nolint:revive +package service_account + +import ( + "context" + + "github.com/hashicorp/go-hclog" + "github.com/rigdev/rig/pkg/controller/plugin" + "github.com/rigdev/rig/pkg/pipeline" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + Name = "rigdev.service_account" +) + +// Configuration for the deployment plugin +// +kubebuilder:object:root=true +type Config struct { +} + +type Plugin struct { + configBytes []byte +} + +func (p *Plugin) Initialize(req plugin.InitializeRequest) error { + p.configBytes = req.Config + return nil +} + +func (p *Plugin) Run(ctx context.Context, req pipeline.CapsuleRequest, logger hclog.Logger) error { + // We do not have any configuration for this step? + // var config Config + var err error + if len(p.configBytes) > 0 { + _, err = plugin.ParseTemplatedConfig[Config](p.configBytes, req, plugin.CapsuleStep[Config]) + if err != nil { + return err + } + } + + sa := p.createServiceAccount(req) + return req.Set(sa) +} + +func (s *Plugin) createServiceAccount(req pipeline.CapsuleRequest) *corev1.ServiceAccount { + sa := &corev1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceAccount", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: req.Capsule().Name, + Namespace: req.Capsule().Namespace, + }, + } + + return sa +} diff --git a/plugins/service_monitor/README.md b/plugins/service_monitor/README.md new file mode 100644 index 000000000..fc516e70c --- /dev/null +++ b/plugins/service_monitor/README.md @@ -0,0 +1,30 @@ +# Service Monitor Plugin +The `rigdev.service_monitor` plugin spawns a Prometheus ServiceMonitor per capsule +for use with a Prometheus Operator stack. The service monitor will monitor services with the same name as the capsule and will use the endpoint specified by the `path` and `portName` fields in the configuration. + +## Example + +Config (in the context of the rig-operator Helm values): +``` +config: + pipeline: + serviceMonitorStep: + plugin: "rigdev.service_monitor" + config: | + path: metrics + portName: metricsport +``` + +## Config + + + +Configuration for the deployment plugin + +| Field | Description | +| --- | --- | +| `Path` _string_ | | +| `PortName` _string_ | | + + + diff --git a/plugins/service_monitor/plugin.go b/plugins/service_monitor/plugin.go new file mode 100644 index 000000000..ca09b9b5c --- /dev/null +++ b/plugins/service_monitor/plugin.go @@ -0,0 +1,83 @@ +// +groupName=plugins.rig.dev -- Only used for config doc generation +// +//nolint:revive +package service_monitor + +import ( + "context" + + "github.com/hashicorp/go-hclog" + monitorv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + "github.com/rigdev/rig/pkg/controller/plugin" + "github.com/rigdev/rig/pkg/errors" + "github.com/rigdev/rig/pkg/pipeline" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + Name = "rigdev.service_monitor" +) + +// Configuration for the deployment plugin +// +kubebuilder:object:root=true +type Config struct { + Path string + PortName string +} + +type Plugin struct { + configBytes []byte +} + +func (p *Plugin) Initialize(req plugin.InitializeRequest) error { + p.configBytes = req.Config + return nil +} + +func (p *Plugin) Run(ctx context.Context, req pipeline.CapsuleRequest, logger hclog.Logger) error { + // We do not have any configuration for this step? + var cfg Config + var err error + if len(p.configBytes) > 0 { + cfg, err = plugin.ParseTemplatedConfig[Config](p.configBytes, req, plugin.CapsuleStep[Config]) + if err != nil { + return err + } + } + + // Consider returning an error. If you get this far, you should have a configuration. + if cfg.PortName == "" { + return errors.InvalidArgumentErrorf("portName is required to create a ServiceMonitor") + } + + serviceMonitor := p.createPrometheusServiceMonitor(req, cfg) + return req.Set(serviceMonitor) +} + +func (p *Plugin) createPrometheusServiceMonitor(req pipeline.CapsuleRequest, cfg Config) *monitorv1.ServiceMonitor { + return &monitorv1.ServiceMonitor{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceMonitor", + APIVersion: "monitoring.coreos.com/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: req.Capsule().Name, + Namespace: req.Capsule().Namespace, + ResourceVersion: "", + Labels: map[string]string{ + pipeline.LabelCapsule: req.Capsule().Name, + }, + }, + Spec: monitorv1.ServiceMonitorSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.LabelCapsule: req.Capsule().Name, + }, + }, + Endpoints: []monitorv1.Endpoint{{ + Port: cfg.PortName, + Path: cfg.Path, + }}, + }, + } +} diff --git a/plugins/vpa/README.md b/plugins/vpa/README.md new file mode 100644 index 000000000..6983dfbbc --- /dev/null +++ b/plugins/vpa/README.md @@ -0,0 +1,12 @@ +## Vertical Pod Autoscaler Plugin + + +## Config + + + +Configuration for the deployment plugin + + + + diff --git a/pkg/service/pipeline/vpa_step.go b/plugins/vpa/plugin.go similarity index 72% rename from pkg/service/pipeline/vpa_step.go rename to plugins/vpa/plugin.go index bb54ea04a..7748bf399 100644 --- a/pkg/service/pipeline/vpa_step.go +++ b/plugins/vpa/plugin.go @@ -1,10 +1,14 @@ -package pipeline +// +groupName=plugins.rig.dev -- Only used for config doc generation +// +//nolint:revive +package vpa import ( "context" "fmt" - "github.com/rigdev/rig/pkg/api/config/v1alpha1" + "github.com/hashicorp/go-hclog" + "github.com/rigdev/rig/pkg/controller/plugin" "github.com/rigdev/rig/pkg/pipeline" "github.com/rigdev/rig/pkg/ptr" appsv1 "k8s.io/api/apps/v1" @@ -15,26 +19,40 @@ import ( vpav1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" ) -type VPAStep struct { - cfg *v1alpha1.OperatorConfig +const ( + Name = "rigdev.vpa" +) + +// Configuration for the deployment plugin +// +kubebuilder:object:root=true +type Config struct { } -func NewVPAStep(cfg *v1alpha1.OperatorConfig) *VPAStep { - return &VPAStep{ - cfg: cfg, - } +type Plugin struct { + configBytes []byte +} + +func (p *Plugin) Initialize(req plugin.InitializeRequest) error { + p.configBytes = req.Config + return nil } -func (s *VPAStep) Apply(_ context.Context, req pipeline.CapsuleRequest) error { - if !s.cfg.VerticalPodAutoscaler.Enabled { - return nil +func (p *Plugin) Run(ctx context.Context, req pipeline.CapsuleRequest, logger hclog.Logger) error { + // We do not have any configuration for this step? + // var cfg Config + var err error + if len(p.configBytes) > 0 { + _, err = plugin.ParseTemplatedConfig[Config](p.configBytes, req, plugin.CapsuleStep[Config]) + if err != nil { + return err + } } - vpa := s.createVPA(req) + vpa := p.createVPA(req) return req.Set(vpa) } -func (s *VPAStep) createVPA(req pipeline.CapsuleRequest) *vpav1.VerticalPodAutoscaler { +func (p *Plugin) createVPA(req pipeline.CapsuleRequest) *vpav1.VerticalPodAutoscaler { vpa := &vpav1.VerticalPodAutoscaler{ TypeMeta: metav1.TypeMeta{ Kind: "VerticalPodAutoscaler", @@ -66,7 +84,7 @@ func (s *VPAStep) createVPA(req pipeline.CapsuleRequest) *vpav1.VerticalPodAutos } // This should be used once we create a VPA per namespace -func (s *VPAStep) createVPARecommender(req pipeline.CapsuleRequest) *appsv1.Deployment { //nolint:unused +func (p *Plugin) createVPARecommender(req pipeline.CapsuleRequest) *appsv1.Deployment { //nolint:unused return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-vpa", req.Capsule().Namespace), diff --git a/proto/rig/operator/api/v1/plugin/service.proto b/proto/rig/operator/api/v1/plugin/service.proto index 54d5a664d..0527debe6 100644 --- a/proto/rig/operator/api/v1/plugin/service.proto +++ b/proto/rig/operator/api/v1/plugin/service.proto @@ -19,6 +19,8 @@ message InitializeRequest { string plugin_config = 1; bytes operator_config = 2; string tag = 3; + // If not given the plugin will try to initialize using the in-cluster-config. + RestConfig rest_config = 4; } message InitializeResponse {} @@ -73,3 +75,9 @@ message ListObjectsRequest { message ListObjectsResponse { repeated bytes objects = 1; } + +message RestConfig { + string host = 1; + string bearer_token = 2; + bytes tls_config = 3; +} diff --git a/test/integration/k8s/capsule_controller_test.go b/test/integration/k8s/capsule_controller_test.go index 754649137..dfbf0ed1b 100644 --- a/test/integration/k8s/capsule_controller_test.go +++ b/test/integration/k8s/capsule_controller_test.go @@ -10,8 +10,8 @@ import ( "github.com/google/uuid" monitorv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/rigdev/rig/pkg/api/v1alpha2" - "github.com/rigdev/rig/pkg/controller" "github.com/rigdev/rig/pkg/hash" + "github.com/rigdev/rig/pkg/pipeline" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" batchv1 "k8s.io/api/batch/v1" @@ -85,7 +85,7 @@ func (s *K8sTestSuite) TestControllerSharedSecrets() { Name: uuid.NewString(), Namespace: nsName.Namespace, Labels: map[string]string{ - controller.LabelSharedConfig: "true", + pipeline.LabelSharedConfig: "true", }, }, Data: map[string][]byte{ @@ -133,7 +133,7 @@ func (s *K8sTestSuite) TestControllerSharedSecrets() { }, waitFor, tick) secret.Name = uuid.NewString() - delete(secret.Labels, controller.LabelSharedConfig) + delete(secret.Labels, pipeline.LabelSharedConfig) secret.ResourceVersion = "" s.Require().NoError(s.Client.Create(ctx, &secret)) @@ -262,7 +262,6 @@ func (s *K8sTestSuite) TestController() { s.testCreateCapsule(ctx) s.testInterface(ctx) s.testIngress(ctx) - s.testLoadbalancer(ctx) s.testIngressRoutes(ctx) s.testEnvVar(ctx) s.testConfigMap(ctx) @@ -565,46 +564,6 @@ func (s *K8sTestSuite) testIngress(ctx context.Context) { }) } -func (s *K8sTestSuite) testLoadbalancer(ctx context.Context) { - s.by("Changing ingress to loadbalancer") - capsuleOwnerRef := s.updateCapsule(ctx, func(c *v1alpha2.Capsule) { - c.Spec.Interfaces[0].Public = &v1alpha2.CapsulePublicInterface{ - LoadBalancer: &v1alpha2.CapsuleInterfaceLoadBalancer{ - Port: 1, - }, - } - }) - - s.Assert().Eventually(func() bool { - if err := s.Client.Get(ctx, nsName, &netv1.Ingress{}); err != nil { - if kerrors.IsNotFound(err) { - return true - } - } - return false - }, waitFor, tick) - - s.expectResources(ctx, []client.Object{ - &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-lb", nsName.Name), - Namespace: nsName.Namespace, - OwnerReferences: []metav1.OwnerReference{ - capsuleOwnerRef, - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Name: "http", - Port: 1, - TargetPort: intstr.FromString("http"), - }}, - Type: v1.ServiceTypeLoadBalancer, - }, - }, - }) -} - func (s *K8sTestSuite) testIngressRoutes(ctx context.Context) { s.by("Adding ingress routes") capsuleOwnerRef := s.updateCapsule(ctx, func(c *v1alpha2.Capsule) { @@ -719,7 +678,7 @@ func (s *K8sTestSuite) testEnvVar(ctx context.Context) { Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - controller.AnnotationChecksumAutoEnv: fmt.Sprintf("%x", h.Sum(nil)), + pipeline.AnnotationChecksumAutoEnv: fmt.Sprintf("%x", h.Sum(nil)), }, }, Spec: v1.PodSpec{ @@ -782,7 +741,7 @@ func (s *K8sTestSuite) testConfigMap(ctx context.Context) { Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - controller.AnnotationChecksumFiles: fmt.Sprintf("%x", h.Sum(nil)), + pipeline.AnnotationChecksumFiles: fmt.Sprintf("%x", h.Sum(nil)), }, }, Spec: v1.PodSpec{ @@ -818,7 +777,7 @@ func (s *K8sTestSuite) testConfigMap(ctx context.Context) { Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - controller.AnnotationChecksumFiles: fmt.Sprintf("%x", h.Sum(nil)), + pipeline.AnnotationChecksumFiles: fmt.Sprintf("%x", h.Sum(nil)), }, }, Spec: v1.PodSpec{ @@ -983,8 +942,8 @@ func (s *K8sTestSuite) testCronJob(ctx context.Context) { Name: "test-job1", Namespace: nsName.Namespace, Labels: map[string]string{ - controller.LabelCapsule: nsName.Name, - controller.LabelCron: "job1", + pipeline.LabelCapsule: nsName.Name, + pipeline.LabelCron: "job1", }, OwnerReferences: []metav1.OwnerReference{capsuleOwnerRef}, }, @@ -993,8 +952,8 @@ func (s *K8sTestSuite) testCronJob(ctx context.Context) { JobTemplate: batchv1.JobTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - controller.LabelCapsule: nsName.Name, - controller.LabelCron: "job1", + pipeline.LabelCapsule: nsName.Name, + pipeline.LabelCron: "job1", }, // Annotations: map[string]string{}, }, @@ -1021,8 +980,8 @@ func (s *K8sTestSuite) testCronJob(ctx context.Context) { Name: "test-job2", Namespace: nsName.Namespace, Labels: map[string]string{ - controller.LabelCapsule: nsName.Name, - controller.LabelCron: "job2", + pipeline.LabelCapsule: nsName.Name, + pipeline.LabelCron: "job2", }, OwnerReferences: []metav1.OwnerReference{capsuleOwnerRef}, }, @@ -1031,8 +990,8 @@ func (s *K8sTestSuite) testCronJob(ctx context.Context) { JobTemplate: batchv1.JobTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - controller.LabelCapsule: nsName.Name, - controller.LabelCron: "job2", + pipeline.LabelCapsule: nsName.Name, + pipeline.LabelCron: "job2", }, }, Spec: batchv1.JobSpec{ @@ -1062,14 +1021,14 @@ func (s *K8sTestSuite) testPrometheusServiceMonitor(ctx context.Context) { Name: nsName.Name, Namespace: nsName.Namespace, Labels: map[string]string{ - controller.LabelCapsule: nsName.Name, + pipeline.LabelCapsule: nsName.Name, }, OwnerReferences: []metav1.OwnerReference{capsuleOwnerRef}, }, Spec: monitorv1.ServiceMonitorSpec{ Selector: metav1.LabelSelector{ MatchLabels: map[string]string{ - controller.LabelCapsule: nsName.Name, + pipeline.LabelCapsule: nsName.Name, }, }, Endpoints: []monitorv1.Endpoint{{ diff --git a/test/integration/k8s/plugin_suite_test.go b/test/integration/k8s/plugin_suite_test.go index 8cde63307..71adda6cf 100644 --- a/test/integration/k8s/plugin_suite_test.go +++ b/test/integration/k8s/plugin_suite_test.go @@ -44,7 +44,9 @@ func (s *PluginTestSuite) SetupSuite() { t := s.Suite.T() + scheme := scheme.New() s.TestEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{ filepath.Join("..", "..", "..", "deploy", "kustomize", "crd", "bases"), filepath.Join("."), @@ -62,7 +64,6 @@ func (s *PluginTestSuite) SetupSuite() { require.NoError(t, err) require.NotNil(t, cfg) - scheme := scheme.New() manager, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme, Metrics: server.Options{BindAddress: "0"}, @@ -79,11 +80,13 @@ func (s *PluginTestSuite) SetupSuite() { s.Client = k8sClient opConfig := &configv1alpha1.OperatorConfig{ - PrometheusServiceMonitor: &configv1alpha1.PrometheusServiceMonitor{ - Path: "metrics", - PortName: "metricsport", - }, Pipeline: configv1alpha1.Pipeline{ + ServiceMonitorStep: configv1alpha1.CapsuleStep{ + Plugin: "rigdev.service_monitor", + Config: ` +path: "metrics" +portName: "metricsport"`, + }, Steps: []configv1alpha1.Step{ { Plugins: []configv1alpha1.Plugin{ @@ -130,7 +133,7 @@ container: wd, err := os.Getwd() require.NoError(t, err) builtinBinPath := path.Join(path.Dir(path.Dir(path.Dir(wd))), "bin", "rig-operator") - pmanager, err := plugin.NewManager(plugin.SetBuiltinBinaryPathOption(builtinBinPath)) + pmanager, err := plugin.NewManager(cfg, plugin.SetBuiltinBinaryPathOption(builtinBinPath)) require.NoError(t, err) lc := fxtest.NewLifecycle(t) ps := pipeline.NewService(opConfig, cc, cs, ctrl.Log, pmanager, lc) diff --git a/test/integration/k8s/suite_test.go b/test/integration/k8s/suite_test.go index 4844a34e2..a8e5acbaa 100644 --- a/test/integration/k8s/suite_test.go +++ b/test/integration/k8s/suite_test.go @@ -54,6 +54,7 @@ func (s *K8sTestSuite) SetupSuite() { t := s.Suite.T() + scheme := scheme.New() s.TestEnv = &envtest.Environment{ CRDDirectoryPaths: []string{ filepath.Join("..", "..", "..", "deploy", "kustomize", "crd", "bases"), @@ -66,10 +67,10 @@ func (s *K8sTestSuite) SetupSuite() { var err error cfg, err := s.TestEnv.Start() + require.NoError(t, err) require.NotNil(t, cfg) - scheme := scheme.New() manager, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme, Metrics: server.Options{BindAddress: "0"}, @@ -87,7 +88,7 @@ func (s *K8sTestSuite) SetupSuite() { opConfig := &configv1alpha1.OperatorConfig{ Pipeline: configv1alpha1.Pipeline{ - RoutesStep: configv1alpha1.RoutesStep{ + RoutesStep: configv1alpha1.CapsuleStep{ Plugin: "rigdev.ingress_routes", Config: ` clusterIssuer: "test" @@ -96,10 +97,12 @@ ingressClassName: "" disableTLS: false `, }, - }, - PrometheusServiceMonitor: &configv1alpha1.PrometheusServiceMonitor{ - Path: "metrics", - PortName: "metricsport", + ServiceMonitorStep: configv1alpha1.CapsuleStep{ + Plugin: "rigdev.service_monitor", + Config: ` +path: "metrics" +portName: "metricsport"`, + }, }, } @@ -111,7 +114,7 @@ disableTLS: false wd, err := os.Getwd() require.NoError(t, err) builtinBinPath := path.Join(path.Dir(path.Dir(path.Dir(wd))), "bin", "rig-operator") - pmanager, err := plugin.NewManager(plugin.SetBuiltinBinaryPathOption(builtinBinPath)) + pmanager, err := plugin.NewManager(cfg, plugin.SetBuiltinBinaryPathOption(builtinBinPath)) require.NoError(t, err) cs := capabilities.NewService(cc, clientSet.Discovery(), nil)