diff --git a/buildscripts/generate-manifests.sh b/buildscripts/generate-manifests.sh index 21135b0f..87052ba8 100755 --- a/buildscripts/generate-manifests.sh +++ b/buildscripts/generate-manifests.sh @@ -61,6 +61,21 @@ echo ' cat deploy/yamls/local.openebs.io_lvmsnapshots.yaml >> deploy/yamls/lvmsnapshot-crd.yaml rm deploy/yamls/local.openebs.io_lvmsnapshots.yaml +echo ' + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition' > deploy/yamls/lvmnode-crd.yaml + +cat deploy/yamls/local.openebs.io_lvmnodes.yaml >> deploy/yamls/lvmnode-crd.yaml +rm deploy/yamls/local.openebs.io_lvmnodes.yaml + ## create the operator file using all the yamls echo '# This manifest is autogenerated via `make manifests` command @@ -80,6 +95,9 @@ cat deploy/yamls/lvmvolume-crd.yaml >> deploy/lvm-operator.yaml # Add LVMSnapshot v1alpha1 CRDs to the Operator yaml cat deploy/yamls/lvmsnapshot-crd.yaml >> deploy/lvm-operator.yaml +# Add LVMNode v1alpha1 CRDs to the Operator yaml +cat deploy/yamls/lvmnode-crd.yaml >> deploy/lvm-operator.yaml + # Add the driver deployment to the Operator yaml cat deploy/yamls/lvm-driver.yaml >> deploy/lvm-operator.yaml diff --git a/deploy/lvm-operator.yaml b/deploy/lvm-operator.yaml index eb9ae7b9..ee601349 100644 --- a/deploy/lvm-operator.yaml +++ b/deploy/lvm-operator.yaml @@ -260,6 +260,118 @@ status: conditions: [] storedVersions: [] + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.2.8 + creationTimestamp: null + name: lvmnodes.local.openebs.io +spec: + group: local.openebs.io + names: + kind: LVMNode + listKind: LVMNodeList + plural: lvmnodes + shortNames: + - lvmnode + singular: lvmnode + preserveUnknownFields: false + scope: Namespaced + validation: + openAPIV3Schema: + description: LVMNode records information about all lvm volume groups available + in a node. In general, the openebs node-agent creates the LVMNode object & + periodically synchronizing the volume groups available in the node. LVMNode + has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + type: string + metadata: + type: object + volumeGroups: + items: + description: VolumeGroup specifies attributes of a given vg exists on + node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + lvCount: + description: LVCount denotes total number of logical volumes in volume + group. + format: int32 + minimum: 0 + type: integer + name: + description: Name of the lvm volume group. + minLength: 1 + type: string + pvCount: + description: PVCount denotes total number of physical volumes constituting + the volume group. + format: int32 + minimum: 0 + type: integer + size: + anyOf: + - type: integer + - type: string + description: Size specifies the total size of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + uuid: + description: UUID denotes a unique identity of a lvm volume group. + minLength: 1 + type: string + required: + - free + - lvCount + - name + - pvCount + - size + - uuid + type: object + type: array + required: + - volumeGroups + type: object + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] + --- # Create the CSI Driver object @@ -939,6 +1051,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -952,7 +1067,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["*"] --- @@ -1159,7 +1274,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/deploy/yamls/lvm-driver.yaml b/deploy/yamls/lvm-driver.yaml index 96f914a2..9ba8d91c 100644 --- a/deploy/yamls/lvm-driver.yaml +++ b/deploy/yamls/lvm-driver.yaml @@ -678,6 +678,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -691,7 +694,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["*"] --- @@ -898,7 +901,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/deploy/yamls/lvmnode-crd.yaml b/deploy/yamls/lvmnode-crd.yaml new file mode 100644 index 00000000..99e40ec2 --- /dev/null +++ b/deploy/yamls/lvmnode-crd.yaml @@ -0,0 +1,112 @@ + + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.2.8 + creationTimestamp: null + name: lvmnodes.local.openebs.io +spec: + group: local.openebs.io + names: + kind: LVMNode + listKind: LVMNodeList + plural: lvmnodes + shortNames: + - lvmnode + singular: lvmnode + preserveUnknownFields: false + scope: Namespaced + validation: + openAPIV3Schema: + description: LVMNode records information about all lvm volume groups available + in a node. In general, the openebs node-agent creates the LVMNode object & + periodically synchronizing the volume groups available in the node. LVMNode + has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + type: string + metadata: + type: object + volumeGroups: + items: + description: VolumeGroup specifies attributes of a given vg exists on + node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + lvCount: + description: LVCount denotes total number of logical volumes in volume + group. + format: int32 + minimum: 0 + type: integer + name: + description: Name of the lvm volume group. + minLength: 1 + type: string + pvCount: + description: PVCount denotes total number of physical volumes constituting + the volume group. + format: int32 + minimum: 0 + type: integer + size: + anyOf: + - type: integer + - type: string + description: Size specifies the total size of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + uuid: + description: UUID denotes a unique identity of a lvm volume group. + minLength: 1 + type: string + required: + - free + - lvCount + - name + - pvCount + - size + - uuid + type: object + type: array + required: + - volumeGroups + type: object + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go b/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..38de28aa --- /dev/null +++ b/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,80 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=lvmnode + +// LVMNode records information about all lvm volume groups available +// in a node. In general, the openebs node-agent creates the LVMNode +// object & periodically synchronizing the volume groups available in the node. +// LVMNode has an owner reference pointing to the corresponding node object. +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Namespaced,shortName=lvmnode +type LVMNode struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + VolumeGroups []VolumeGroup `json:"volumeGroups"` +} + +// VolumeGroup specifies attributes of a given vg exists on node. +type VolumeGroup struct { + // Name of the lvm volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // UUID denotes a unique identity of a lvm volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + UUID string `json:"uuid"` + + // Size specifies the total size of volume group. + // +kubebuilder:validation:Required + Size resource.Quantity `json:"size"` + // Free specifies the available capacity of volume group. + // +kubebuilder:validation:Required + Free resource.Quantity `json:"free"` + + // LVCount denotes total number of logical volumes in + // volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=0 + LVCount int32 `json:"lvCount"` + // PVCount denotes total number of physical volumes + // constituting the volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=0 + PVCount int32 `json:"pvCount"` +} + +// LVMNodeList is a collection of LVMNode resources +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=lvmnodes +type LVMNodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []LVMNode `json:"items"` +} diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/register.go b/pkg/apis/openebs.io/lvm/v1alpha1/register.go index e05b4c26..ea58f5a8 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/register.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/register.go @@ -73,6 +73,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &LVMVolumeList{}, &LVMSnapshot{}, &LVMSnapshotList{}, + &LVMNode{}, + &LVMNodeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go index a95595d0..ebd67268 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,72 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LVMNode) DeepCopyInto(out *LVMNode) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.VolumeGroups != nil { + in, out := &in.VolumeGroups, &out.VolumeGroups + *out = make([]VolumeGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LVMNode. +func (in *LVMNode) DeepCopy() *LVMNode { + if in == nil { + return nil + } + out := new(LVMNode) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LVMNode) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LVMNodeList) DeepCopyInto(out *LVMNodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]LVMNode, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LVMNodeList. +func (in *LVMNodeList) DeepCopy() *LVMNodeList { + if in == nil { + return nil + } + out := new(LVMNodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LVMNodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LVMSnapshot) DeepCopyInto(out *LVMSnapshot) { *out = *in @@ -199,6 +265,24 @@ func (in *VolumeError) DeepCopy() *VolumeError { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VolumeGroup) DeepCopyInto(out *VolumeGroup) { + *out = *in + out.Size = in.Size.DeepCopy() + out.Free = in.Free.DeepCopy() + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeGroup. +func (in *VolumeGroup) DeepCopy() *VolumeGroup { + if in == nil { + return nil + } + out := new(VolumeGroup) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeInfo) DeepCopyInto(out *VolumeInfo) { *out = *in diff --git a/pkg/builder/nodebuilder/kubernetes.go b/pkg/builder/nodebuilder/kubernetes.go new file mode 100644 index 00000000..8375d463 --- /dev/null +++ b/pkg/builder/nodebuilder/kubernetes.go @@ -0,0 +1,427 @@ +// Copyright © 2019 The OpenEBS Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebuilder + +import ( + "encoding/json" + + client "github.com/openebs/lib-csi/pkg/common/kubernetes/client" + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// getClientsetFn is a typed function that +// abstracts fetching of internal clientset +type getClientsetFn func() (clientset *clientset.Clientset, err error) + +// getClientsetFromPathFn is a typed function that +// abstracts fetching of clientset from kubeConfigPath +type getClientsetForPathFn func(kubeConfigPath string) ( + clientset *clientset.Clientset, + err error, +) + +// createFn is a typed function that abstracts +// creating lvm node instance +type createFn func( + cs *clientset.Clientset, + upgradeResultObj *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) + +// getFn is a typed function that abstracts +// fetching a lvm node instance +type getFn func( + cli *clientset.Clientset, + name, + namespace string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) + +// listFn is a typed function that abstracts +// listing of lvm node instances +type listFn func( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.LVMNodeList, error) + +// delFn is a typed function that abstracts +// deleting a lvm node instance +type delFn func( + cli *clientset.Clientset, + name, + namespace string, + opts *metav1.DeleteOptions, +) error + +// updateFn is a typed function that abstracts +// updating lvm node instance +type updateFn func( + cs *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) + +// Kubeclient enables kubernetes API operations +// on lvm node instance +type Kubeclient struct { + // clientset refers to lvm node's + // clientset that will be responsible to + // make kubernetes API calls + clientset *clientset.Clientset + + kubeConfigPath string + + // namespace holds the namespace on which + // kubeclient has to operate + namespace string + + // functions useful during mocking + getClientset getClientsetFn + getClientsetForPath getClientsetForPathFn + get getFn + list listFn + del delFn + create createFn + update updateFn +} + +// KubeclientBuildOption defines the abstraction +// to build a kubeclient instance +type KubeclientBuildOption func(*Kubeclient) + +// defaultGetClientset is the default implementation to +// get kubernetes clientset instance +func defaultGetClientset() (clients *clientset.Clientset, err error) { + + config, err := client.GetConfig(client.New()) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) + +} + +// defaultGetClientsetForPath is the default implementation to +// get kubernetes clientset instance based on the given +// kubeconfig path +func defaultGetClientsetForPath( + kubeConfigPath string, +) (clients *clientset.Clientset, err error) { + config, err := client.GetConfig( + client.New(client.WithKubeConfigPath(kubeConfigPath))) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) +} + +// defaultGet is the default implementation to get +// a lvm node instance in kubernetes cluster +func defaultGet( + cli *clientset.Clientset, + name, namespace string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Get(name, opts) +} + +// defaultList is the default implementation to list +// lvm node instances in kubernetes cluster +func defaultList( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.LVMNodeList, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + List(opts) +} + +// defaultCreate is the default implementation to delete +// a lvm node instance in kubernetes cluster +func defaultDel( + cli *clientset.Clientset, + name, namespace string, + opts *metav1.DeleteOptions, +) error { + deletePropagation := metav1.DeletePropagationForeground + opts.PropagationPolicy = &deletePropagation + err := cli.LocalV1alpha1(). + LVMNodes(namespace). + Delete(name, opts) + return err +} + +// defaultCreate is the default implementation to create +// a lvm node instance in kubernetes cluster +func defaultCreate( + cli *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Create(node) +} + +// defaultUpdate is the default implementation to update +// a lvm node instance in kubernetes cluster +func defaultUpdate( + cli *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Update(node) +} + +// withDefaults sets the default options +// of kubeclient instance +func (k *Kubeclient) withDefaults() { + if k.getClientset == nil { + k.getClientset = defaultGetClientset + } + if k.getClientsetForPath == nil { + k.getClientsetForPath = defaultGetClientsetForPath + } + if k.get == nil { + k.get = defaultGet + } + if k.list == nil { + k.list = defaultList + } + if k.del == nil { + k.del = defaultDel + } + if k.create == nil { + k.create = defaultCreate + } + if k.update == nil { + k.update = defaultUpdate + } +} + +// WithClientSet sets the kubernetes client against +// the kubeclient instance +func WithClientSet(c *clientset.Clientset) KubeclientBuildOption { + return func(k *Kubeclient) { + k.clientset = c + } +} + +// WithNamespace sets the kubernetes client against +// the provided namespace +func WithNamespace(namespace string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.namespace = namespace + } +} + +// WithNamespace sets the provided namespace +// against this Kubeclient instance +func (k *Kubeclient) WithNamespace(namespace string) *Kubeclient { + k.namespace = namespace + return k +} + +// WithKubeConfigPath sets the kubernetes client +// against the provided path +func WithKubeConfigPath(path string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.kubeConfigPath = path + } +} + +// NewKubeclient returns a new instance of +// kubeclient meant for lvm node operations +func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient { + k := &Kubeclient{} + for _, o := range opts { + o(k) + } + + k.withDefaults() + return k +} + +func (k *Kubeclient) getClientsetForPathOrDirect() ( + *clientset.Clientset, + error, +) { + if k.kubeConfigPath != "" { + return k.getClientsetForPath(k.kubeConfigPath) + } + + return k.getClientset() +} + +// getClientOrCached returns either a new instance +// of kubernetes client or its cached copy +func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) { + if k.clientset != nil { + return k.clientset, nil + } + + c, err := k.getClientsetForPathOrDirect() + if err != nil { + return nil, + errors.Wrapf( + err, + "failed to get clientset", + ) + } + + k.clientset = c + return k.clientset, nil +} + +// Create creates a lvm node instance +// in kubernetes cluster +func (k *Kubeclient) Create(node *apis.LVMNode) (*apis.LVMNode, error) { + if node == nil { + return nil, + errors.New( + "failed to create lvm node: nil node object", + ) + } + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create lvm node {%s} in namespace {%s}", + node.Name, + k.namespace, + ) + } + + return k.create(cs, node, k.namespace) +} + +// Get returns lvm node object for given name +func (k *Kubeclient) Get( + name string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) { + if name == "" { + return nil, + errors.New( + "failed to get lvm node: missing lvm node name", + ) + } + + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get lvm node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.get(cli, name, k.namespace, opts) +} + +// GetRaw returns lvm node instance +// in bytes +func (k *Kubeclient) GetRaw( + name string, + opts metav1.GetOptions, +) ([]byte, error) { + if name == "" { + return nil, errors.New( + "failed to get raw lvm node: missing node name", + ) + } + csiv, err := k.Get(name, opts) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get lvm node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return json.Marshal(csiv) +} + +// List returns a list of lvm node +// instances present in kubernetes cluster +func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.LVMNodeList, error) { + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to list lvm nodes in namespace {%s}", + k.namespace, + ) + } + + return k.list(cli, k.namespace, opts) +} + +// Delete deletes the lvm node from +// kubernetes +func (k *Kubeclient) Delete(name string) error { + if name == "" { + return errors.New( + "failed to delete lvmnode: missing node name", + ) + } + cli, err := k.getClientOrCached() + if err != nil { + return errors.Wrapf( + err, + "failed to delete lvmnode {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.del(cli, name, k.namespace, &metav1.DeleteOptions{}) +} + +// Update updates this lvm node instance +// against kubernetes cluster +func (k *Kubeclient) Update(node *apis.LVMNode) (*apis.LVMNode, error) { + if node == nil { + return nil, + errors.New( + "failed to update lvmnode: nil node object", + ) + } + + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to update lvmnode {%s} in namespace {%s}", + node.Name, + node.Namespace, + ) + } + + return k.update(cs, node, k.namespace) +} diff --git a/pkg/builder/nodebuilder/node.go b/pkg/builder/nodebuilder/node.go new file mode 100644 index 00000000..2e4a0802 --- /dev/null +++ b/pkg/builder/nodebuilder/node.go @@ -0,0 +1,122 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodebuilder + +import ( + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Builder is the builder object for LVMNode +type Builder struct { + node *LVMNode + errs []error +} + +// LVMNode is a wrapper over +// LVMNode API instance +type LVMNode struct { + // LVMVolume object + Object *apis.LVMNode +} + +// From returns a new instance of +// lvm volume +func From(node *apis.LVMNode) *LVMNode { + return &LVMNode{ + Object: node, + } +} + +// NewBuilder returns new instance of Builder +func NewBuilder() *Builder { + return &Builder{ + node: &LVMNode{ + Object: &apis.LVMNode{}, + }, + } +} + +// BuildFrom returns new instance of Builder +// from the provided api instance +func BuildFrom(node *apis.LVMNode) *Builder { + if node == nil { + b := NewBuilder() + b.errs = append( + b.errs, + errors.New("failed to build lvm node object: nil node"), + ) + return b + } + return &Builder{ + node: &LVMNode{ + Object: node, + }, + } +} + +// WithNamespace sets the namespace of LVMNode +func (b *Builder) WithNamespace(namespace string) *Builder { + if namespace == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build lvm node object: missing namespace", + ), + ) + return b + } + b.node.Object.Namespace = namespace + return b +} + +// WithName sets the name of LVMNode +func (b *Builder) WithName(name string) *Builder { + if name == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build lvm node object: missing name", + ), + ) + return b + } + b.node.Object.Name = name + return b +} + +// WithVolumeGroups sets the volume groups of LVMNode +func (b *Builder) WithVolumeGroups(vgs []apis.VolumeGroup) *Builder { + b.node.Object.VolumeGroups = vgs + return b +} + +// WithOwnerReferences sets the owner references of LVMNode +func (b *Builder) WithOwnerReferences(ownerRefs ...metav1.OwnerReference) *Builder { + b.node.Object.OwnerReferences = ownerRefs + return b +} + +// Build returns LVMNode API object +func (b *Builder) Build() (*apis.LVMNode, error) { + if len(b.errs) > 0 { + return nil, errors.Errorf("%+v", b.errs) + } + + return b.node.Object, nil +} diff --git a/pkg/driver/agent.go b/pkg/driver/agent.go index 5d45c5d5..e7cd3466 100644 --- a/pkg/driver/agent.go +++ b/pkg/driver/agent.go @@ -28,6 +28,7 @@ import ( apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "github.com/openebs/lvm-localpv/pkg/builder/volbuilder" "github.com/openebs/lvm-localpv/pkg/lvm" + "github.com/openebs/lvm-localpv/pkg/mgmt/lvmnode" "github.com/openebs/lvm-localpv/pkg/mgmt/snapshot" "github.com/openebs/lvm-localpv/pkg/mgmt/volume" "golang.org/x/net/context" @@ -53,6 +54,14 @@ func NewNode(d *CSIDriver) csi.NodeServer { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() + // start the lvm node resource watcher + go func() { + err := lvmnode.Start(&ControllerMutex, stopCh) + if err != nil { + klog.Fatalf("Failed to start LVM node controller: %s", err.Error()) + } + }() + // start the lvmvolume watcher go func() { err := volume.Start(&ControllerMutex, stopCh) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 4f16282c..ec14716d 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -22,11 +22,22 @@ import ( "strings" "time" + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + "github.com/openebs/lib-csi/pkg/common/helpers" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + "github.com/container-storage-interface/spec/lib/go/csi" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" k8serror "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" "k8s.io/klog" errors "github.com/openebs/lib-csi/pkg/common/errors" @@ -52,15 +63,26 @@ const ( type controller struct { driver *CSIDriver capabilities []*csi.ControllerServiceCapability + + indexedLabel string + + k8sNodeInformer cache.SharedIndexInformer + lvmNodeInformer cache.SharedIndexInformer } // NewController returns a new instance // of CSI controller func NewController(d *CSIDriver) csi.ControllerServer { - return &controller{ + ctrl := &controller{ driver: d, capabilities: newControllerCapabilities(), } + + if err := ctrl.init(); err != nil { + klog.Fatalf("init controller: %v", err) + } + + return ctrl } // SupportedVolumeCapabilityAccessModes contains the list of supported access @@ -148,6 +170,50 @@ func waitForLVMVolume(ctx context.Context, return vol, false, status.Error(codes.Aborted, errMsg) } +func (cs *controller) init() error { + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrapf(err, "failed to build kubeconfig") + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build k8s clientset") + } + + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build openebs clientset") + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + openebsInformerfactory := informers.NewSharedInformerFactoryWithOptions(openebsClient, + 0, informers.WithNamespace(lvm.LvmNamespace)) + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cs.k8sNodeInformer = kubeInformerFactory.Core().V1().Nodes().Informer() + cs.lvmNodeInformer = openebsInformerfactory.Local().V1alpha1().LVMNodes().Informer() + + if err = cs.lvmNodeInformer.AddIndexers(map[string]cache.IndexFunc{ + LabelIndexName(cs.indexedLabel): LabelIndexFunc(cs.indexedLabel), + }); err != nil { + return errors.Wrapf(err, "failed to add index on label %v", cs.indexedLabel) + } + + go cs.k8sNodeInformer.Run(stopCh) + go cs.lvmNodeInformer.Run(stopCh) + + // wait for all the caches to be populated. + klog.Info("waiting for k8s & lvm node informer caches to be synced") + cache.WaitForCacheSync(stopCh, + cs.k8sNodeInformer.HasSynced, + cs.lvmNodeInformer.HasSynced) + klog.Info("synced k8s & lvm node informer caches") + return nil +} + // CreateLVMVolume create new lvm volume for csi volume request func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest, params *VolumeParams) (*lvmapi.LVMVolume, error) { @@ -619,7 +685,7 @@ func (cs *controller) ControllerPublishVolume( } // GetCapacity return the capacity of the -// given volume +// given node topology segment. // // This implements csi.ControllerServer func (cs *controller) GetCapacity( @@ -627,7 +693,89 @@ func (cs *controller) GetCapacity( req *csi.GetCapacityRequest, ) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + var segments map[string]string + if topology := req.GetAccessibleTopology(); topology != nil { + segments = topology.Segments + } + nodeNames, err := cs.filterNodesByTopology(segments) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + lvmNodesCache := cs.lvmNodeInformer.GetIndexer() + params := req.GetParameters() + vgParam := helpers.GetInsensitiveParameter(¶ms, "volgroup") + + var availableCapacity int64 + for _, nodeName := range nodeNames { + v, exists, err := lvmNodesCache.GetByKey(lvm.LvmNamespace + "/" + nodeName) + if err != nil { + klog.Warning("unexpected error after querying the lvmNode informer cache") + continue + } + if !exists { + continue + } + lvmNode := v.(*lvmapi.LVMNode) + // rather than summing all free capacity, we are calculating maximum + // lv size that gets fit in given vg. + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#available-capacity-vs-maximum-volume-size & + // https://github.com/container-storage-interface/spec/issues/432 for more details + for _, vg := range lvmNode.VolumeGroups { + if vg.Name != vgParam { + continue + } + freeCapacity := vg.Free.Value() + if availableCapacity < freeCapacity { + availableCapacity = freeCapacity + } + } + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: availableCapacity, + }, nil +} + +func (cs *controller) filterNodesByTopology(segments map[string]string) ([]string, error) { + nodesCache := cs.k8sNodeInformer.GetIndexer() + if len(segments) == 0 { + return nodesCache.ListKeys(), nil + } + + filterNodes := func(vs []interface{}) ([]string, error) { + var names []string + selector := labels.SelectorFromSet(segments) + for _, v := range vs { + meta, err := apimeta.Accessor(v) + if err != nil { + return nil, err + } + if selector.Matches(labels.Set(meta.GetLabels())) { + names = append(names, meta.GetName()) + } + } + return names, nil + } + + // first see if we need to filter the informer cache by indexed label, + // so that we don't need to iterate over all the nodes for performance + // reasons in large cluster. + indexName := LabelIndexName(cs.indexedLabel) + if _, ok := nodesCache.GetIndexers()[indexName]; !ok { + // run through all the nodes in case indexer doesn't exists. + return filterNodes(nodesCache.List()) + } + + if segValue, ok := segments[cs.indexedLabel]; ok { + vs, err := nodesCache.ByIndex(indexName, segValue) + if err != nil { + return nil, errors.Wrapf(err, "query indexed store indexName=%v indexKey=%v", + indexName, segValue) + } + return filterNodes(vs) + } + return filterNodes(nodesCache.List()) } // ListVolumes lists all the volumes @@ -708,6 +856,7 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability { csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, } { capabilities = append(capabilities, fromType(cap)) } @@ -795,3 +944,24 @@ func validateSnapshotRequest(req *csi.CreateSnapshotRequest) error { } return nil } + +// LabelIndexName add prefix for label index. +func LabelIndexName(label string) string { + return "l:" + label +} + +// LabelIndexFunc defines index values for given label. +func LabelIndexFunc(label string) cache.IndexFunc { + return func(obj interface{}) ([]string, error) { + meta, err := apimeta.Accessor(obj) + if err != nil { + return nil, fmt.Errorf( + "k8s api object type (%T) doesn't implements metav1.Object interface: %v", obj, err) + } + var vs []string + if v, ok := meta.GetLabels()[label]; ok { + vs = append(vs, v) + } + return vs, nil + } +} diff --git a/pkg/equality/semantic.go b/pkg/equality/semantic.go new file mode 100644 index 00000000..3da98f29 --- /dev/null +++ b/pkg/equality/semantic.go @@ -0,0 +1,55 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package equality + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" +) + +// Since we are using quite older version of k8s.io/* libraries, we need take up +// their upgrades as part of separate pull request. +// +// Copied from https://github.com/kubernetes/apimachinery/blob/c93b0f84892eb6bcbc80b312ae70729d8168bc7e/pkg/api/equality/semantic.go +// More details at - https://github.com/kubernetes/apimachinery/issues/75 + +// Semantic can do semantic deep equality checks for api objects. +// Example: apiequality.Semantic.DeepEqual(aPod, aPodWithNonNilButEmptyMaps) == true +var Semantic = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC() == b.UTC() + }, + func(a, b metav1.Time) bool { + return a.UTC() == b.UTC() + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, +) diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go index 9eabcf68..6642add4 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go @@ -28,6 +28,10 @@ type FakeLocalV1alpha1 struct { *testing.Fake } +func (c *FakeLocalV1alpha1) LVMNodes(namespace string) v1alpha1.LVMNodeInterface { + return &FakeLVMNodes{c, namespace} +} + func (c *FakeLocalV1alpha1) LVMSnapshots(namespace string) v1alpha1.LVMSnapshotInterface { return &FakeLVMSnapshots{c, namespace} } diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go new file mode 100644 index 00000000..cf8424ed --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeLVMNodes implements LVMNodeInterface +type FakeLVMNodes struct { + Fake *FakeLocalV1alpha1 + ns string +} + +var lvmnodesResource = schema.GroupVersionResource{Group: "local.openebs.io", Version: "v1alpha1", Resource: "lvmnodes"} + +var lvmnodesKind = schema.GroupVersionKind{Group: "local.openebs.io", Version: "v1alpha1", Kind: "LVMNode"} + +// Get takes name of the lVMNode, and returns the corresponding lVMNode object, and an error if there is any. +func (c *FakeLVMNodes) Get(name string, options v1.GetOptions) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(lvmnodesResource, c.ns, name), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// List takes label and field selectors, and returns the list of LVMNodes that match those selectors. +func (c *FakeLVMNodes) List(opts v1.ListOptions) (result *v1alpha1.LVMNodeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(lvmnodesResource, lvmnodesKind, c.ns, opts), &v1alpha1.LVMNodeList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.LVMNodeList{ListMeta: obj.(*v1alpha1.LVMNodeList).ListMeta} + for _, item := range obj.(*v1alpha1.LVMNodeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested lVMNodes. +func (c *FakeLVMNodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(lvmnodesResource, c.ns, opts)) + +} + +// Create takes the representation of a lVMNode and creates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *FakeLVMNodes) Create(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(lvmnodesResource, c.ns, lVMNode), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// Update takes the representation of a lVMNode and updates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *FakeLVMNodes) Update(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(lvmnodesResource, c.ns, lVMNode), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// Delete takes name of the lVMNode and deletes it. Returns an error if one occurs. +func (c *FakeLVMNodes) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(lvmnodesResource, c.ns, name), &v1alpha1.LVMNode{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeLVMNodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(lvmnodesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.LVMNodeList{}) + return err +} + +// Patch applies the patch and returns the patched lVMNode. +func (c *FakeLVMNodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(lvmnodesResource, c.ns, name, pt, data, subresources...), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go index 794b878d..7e7d8b46 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go @@ -18,6 +18,8 @@ limitations under the License. package v1alpha1 +type LVMNodeExpansion interface{} + type LVMSnapshotExpansion interface{} type LVMVolumeExpansion interface{} diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go index e7c713ca..85a70fb3 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go @@ -26,6 +26,7 @@ import ( type LocalV1alpha1Interface interface { RESTClient() rest.Interface + LVMNodesGetter LVMSnapshotsGetter LVMVolumesGetter } @@ -35,6 +36,10 @@ type LocalV1alpha1Client struct { restClient rest.Interface } +func (c *LocalV1alpha1Client) LVMNodes(namespace string) LVMNodeInterface { + return newLVMNodes(c, namespace) +} + func (c *LocalV1alpha1Client) LVMSnapshots(namespace string) LVMSnapshotInterface { return newLVMSnapshots(c, namespace) } diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..e9b560d2 --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,174 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "time" + + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + scheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// LVMNodesGetter has a method to return a LVMNodeInterface. +// A group's client should implement this interface. +type LVMNodesGetter interface { + LVMNodes(namespace string) LVMNodeInterface +} + +// LVMNodeInterface has methods to work with LVMNode resources. +type LVMNodeInterface interface { + Create(*v1alpha1.LVMNode) (*v1alpha1.LVMNode, error) + Update(*v1alpha1.LVMNode) (*v1alpha1.LVMNode, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.LVMNode, error) + List(opts v1.ListOptions) (*v1alpha1.LVMNodeList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) + LVMNodeExpansion +} + +// lVMNodes implements LVMNodeInterface +type lVMNodes struct { + client rest.Interface + ns string +} + +// newLVMNodes returns a LVMNodes +func newLVMNodes(c *LocalV1alpha1Client, namespace string) *lVMNodes { + return &lVMNodes{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the lVMNode, and returns the corresponding lVMNode object, and an error if there is any. +func (c *lVMNodes) Get(name string, options v1.GetOptions) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of LVMNodes that match those selectors. +func (c *lVMNodes) List(opts v1.ListOptions) (result *v1alpha1.LVMNodeList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.LVMNodeList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested lVMNodes. +func (c *lVMNodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch() +} + +// Create takes the representation of a lVMNode and creates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *lVMNodes) Create(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Post(). + Namespace(c.ns). + Resource("lvmnodes"). + Body(lVMNode). + Do(). + Into(result) + return +} + +// Update takes the representation of a lVMNode and updates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *lVMNodes) Update(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Put(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(lVMNode.Name). + Body(lVMNode). + Do(). + Into(result) + return +} + +// Delete takes name of the lVMNode and deletes it. Returns an error if one occurs. +func (c *lVMNodes) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *lVMNodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + var timeout time.Duration + if listOptions.TimeoutSeconds != nil { + timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Timeout(timeout). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched lVMNode. +func (c *lVMNodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("lvmnodes"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/generated/informer/externalversions/generic.go b/pkg/generated/informer/externalversions/generic.go index df392d41..2c126da7 100644 --- a/pkg/generated/informer/externalversions/generic.go +++ b/pkg/generated/informer/externalversions/generic.go @@ -53,6 +53,8 @@ func (f *genericInformer) Lister() cache.GenericLister { func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { // Group=local.openebs.io, Version=v1alpha1 + case v1alpha1.SchemeGroupVersion.WithResource("lvmnodes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Local().V1alpha1().LVMNodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("lvmsnapshots"): return &genericInformer{resource: resource.GroupResource(), informer: f.Local().V1alpha1().LVMSnapshots().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("lvmvolumes"): diff --git a/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go b/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go index f72897a3..c2519c5c 100644 --- a/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go +++ b/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // LVMNodes returns a LVMNodeInformer. + LVMNodes() LVMNodeInformer // LVMSnapshots returns a LVMSnapshotInformer. LVMSnapshots() LVMSnapshotInformer // LVMVolumes returns a LVMVolumeInformer. @@ -41,6 +43,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// LVMNodes returns a LVMNodeInformer. +func (v *version) LVMNodes() LVMNodeInformer { + return &lVMNodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // LVMSnapshots returns a LVMSnapshotInformer. func (v *version) LVMSnapshots() LVMSnapshotInformer { return &lVMSnapshotInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go b/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..4773dcaf --- /dev/null +++ b/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,89 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + lvmv1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + internalclientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + internalinterfaces "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions/internalinterfaces" + v1alpha1 "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// LVMNodeInformer provides access to a shared informer and lister for +// LVMNodes. +type LVMNodeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.LVMNodeLister +} + +type lVMNodeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewLVMNodeInformer constructs a new informer for LVMNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewLVMNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredLVMNodeInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredLVMNodeInformer constructs a new informer for LVMNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredLVMNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LocalV1alpha1().LVMNodes(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LocalV1alpha1().LVMNodes(namespace).Watch(options) + }, + }, + &lvmv1alpha1.LVMNode{}, + resyncPeriod, + indexers, + ) +} + +func (f *lVMNodeInformer) defaultInformer(client internalclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredLVMNodeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *lVMNodeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&lvmv1alpha1.LVMNode{}, f.defaultInformer) +} + +func (f *lVMNodeInformer) Lister() v1alpha1.LVMNodeLister { + return v1alpha1.NewLVMNodeLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go b/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go index 0513aa49..66be1551 100644 --- a/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go +++ b/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package v1alpha1 +// LVMNodeListerExpansion allows custom methods to be added to +// LVMNodeLister. +type LVMNodeListerExpansion interface{} + +// LVMNodeNamespaceListerExpansion allows custom methods to be added to +// LVMNodeNamespaceLister. +type LVMNodeNamespaceListerExpansion interface{} + // LVMSnapshotListerExpansion allows custom methods to be added to // LVMSnapshotLister. type LVMSnapshotListerExpansion interface{} diff --git a/pkg/generated/lister/lvm/v1alpha1/lvmnode.go b/pkg/generated/lister/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..6058890b --- /dev/null +++ b/pkg/generated/lister/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,94 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// LVMNodeLister helps list LVMNodes. +type LVMNodeLister interface { + // List lists all LVMNodes in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) + // LVMNodes returns an object that can list and get LVMNodes. + LVMNodes(namespace string) LVMNodeNamespaceLister + LVMNodeListerExpansion +} + +// lVMNodeLister implements the LVMNodeLister interface. +type lVMNodeLister struct { + indexer cache.Indexer +} + +// NewLVMNodeLister returns a new LVMNodeLister. +func NewLVMNodeLister(indexer cache.Indexer) LVMNodeLister { + return &lVMNodeLister{indexer: indexer} +} + +// List lists all LVMNodes in the indexer. +func (s *lVMNodeLister) List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LVMNode)) + }) + return ret, err +} + +// LVMNodes returns an object that can list and get LVMNodes. +func (s *lVMNodeLister) LVMNodes(namespace string) LVMNodeNamespaceLister { + return lVMNodeNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// LVMNodeNamespaceLister helps list and get LVMNodes. +type LVMNodeNamespaceLister interface { + // List lists all LVMNodes in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) + // Get retrieves the LVMNode from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.LVMNode, error) + LVMNodeNamespaceListerExpansion +} + +// lVMNodeNamespaceLister implements the LVMNodeNamespaceLister +// interface. +type lVMNodeNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all LVMNodes in the indexer for a given namespace. +func (s lVMNodeNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LVMNode)) + }) + return ret, err +} + +// Get retrieves the LVMNode from the indexer for a given namespace and name. +func (s lVMNodeNamespaceLister) Get(name string) (*v1alpha1.LVMNode, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("lvmnode"), name) + } + return obj.(*v1alpha1.LVMNode), nil +} diff --git a/pkg/lvm/lvm_util.go b/pkg/lvm/lvm_util.go index ba09493c..4305a0d3 100644 --- a/pkg/lvm/lvm_util.go +++ b/pkg/lvm/lvm_util.go @@ -17,13 +17,16 @@ limitations under the License. package lvm import ( + "encoding/json" "fmt" "os" "os/exec" + "strconv" "strings" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog" ) @@ -36,6 +39,8 @@ const ( // lvm command related constants const ( VGCreate = "vgcreate" + VGList = "vgs" + LVCreate = "lvcreate" LVRemove = "lvremove" LVExtend = "lvextend" @@ -289,3 +294,82 @@ func DestroySnapshot(snap *apis.LVMSnapshot) error { func getLVMSnapName(snapName string) string { return strings.TrimPrefix(snapName, "snapshot-") } + +func decodeVgsJSON(raw []byte) ([]apis.VolumeGroup, error) { + output := &struct { + Report []struct { + VolumeGroups []map[string]string `json:"vg"` + } `json:"report"` + }{} + var err error + if err = json.Unmarshal(raw, output); err != nil { + return nil, err + } + + if len(output.Report) != 1 { + return nil, fmt.Errorf("expected exactly one lvm report") + } + + items := output.Report[0].VolumeGroups + vgs := make([]apis.VolumeGroup, 0, len(items)) + for _, item := range items { + var vg apis.VolumeGroup + if vg, err = parseVolumeGroup(item); err != nil { + return vgs, err + } + vgs = append(vgs, vg) + } + return vgs, nil +} + +func parseVolumeGroup(m map[string]string) (apis.VolumeGroup, error) { + var vg apis.VolumeGroup + vg.Name = m["vg_name"] + vg.UUID = m["vg_uuid"] + + int32Map := map[string]*int32{ + "pv_count": &vg.PVCount, + "lv_count": &vg.LVCount, + } + for key, value := range int32Map { + count, err := strconv.Atoi(m[key]) + if err != nil { + err = fmt.Errorf("invalid format of %v=%v for vg %v: %v", key, m[key], vg.Name, err) + } + *value = int32(count) + } + + resQuantityMap := map[string]*resource.Quantity{ + "vg_size": &vg.Size, + "vg_free": &vg.Free, + } + + for key, value := range resQuantityMap { + sizeBytes, err := strconv.ParseInt( + strings.TrimSuffix(strings.ToLower(m[key]), "b"), + 10, 64) + if err != nil { + err = fmt.Errorf("invalid format of %v=%v for vg %v: %v", key, m[key], vg.Name, err) + } + quantity := resource.NewQuantity(sizeBytes, resource.BinarySI) + *value = *quantity // + } + return vg, nil +} + +// ListLVMVolumeGroup invokes `vgs` to list all the available volume +// groups in the node. +func ListLVMVolumeGroup() ([]apis.VolumeGroup, error) { + args := []string{ + "--options", "vg_all", + "--reportformat", "json", + "--units", "b", + } + cmd := exec.Command(VGList, args...) + output, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("lvm: list volume group cmd %v: %v", args, err) + return nil, err + } + return decodeVgsJSON(output) +} diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go new file mode 100644 index 00000000..33ec1415 --- /dev/null +++ b/pkg/mgmt/lvmnode/builder.go @@ -0,0 +1,157 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package lvmnode + +import ( + "time" + + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + openebsScheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + listers "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +const controllerAgentName = "lvmnode-controller" + +// NodeController is the controller implementation for lvm node resources +type NodeController struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + + // clientset is a openebs custom resource package generated for custom API group. + clientset clientset.Interface + + NodeLister listers.LVMNodeLister + + // NodeSynced is used for caches sync to get populated + NodeSynced cache.InformerSynced + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder + + // pollInterval controls the polling frequency of syncing up the vg metadata. + pollInterval time.Duration + + // ownerRef is used to set the owner reference to lvmnode objects. + ownerRef metav1.OwnerReference +} + +// NodeControllerBuilder is the builder object for controller. +type NodeControllerBuilder struct { + NodeController *NodeController +} + +// NewNodeControllerBuilder returns an empty instance of controller builder. +func NewNodeControllerBuilder() *NodeControllerBuilder { + return &NodeControllerBuilder{ + NodeController: &NodeController{}, + } +} + +// withKubeClient fills kube client to controller object. +func (cb *NodeControllerBuilder) withKubeClient(ks kubernetes.Interface) *NodeControllerBuilder { + cb.NodeController.kubeclientset = ks + return cb +} + +// withOpenEBSClient fills openebs client to controller object. +func (cb *NodeControllerBuilder) withOpenEBSClient(cs clientset.Interface) *NodeControllerBuilder { + cb.NodeController.clientset = cs + return cb +} + +// withNodeLister fills Node lister to controller object. +func (cb *NodeControllerBuilder) withNodeLister(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Local().V1alpha1().LVMNodes() + cb.NodeController.NodeLister = NodeInformer.Lister() + return cb +} + +// withNodeSynced adds object sync information in cache to controller object. +func (cb *NodeControllerBuilder) withNodeSynced(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Local().V1alpha1().LVMNodes() + cb.NodeController.NodeSynced = NodeInformer.Informer().HasSynced + return cb +} + +// withWorkqueue adds workqueue to controller object. +func (cb *NodeControllerBuilder) withWorkqueueRateLimiting() *NodeControllerBuilder { + cb.NodeController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node") + return cb +} + +// withRecorder adds recorder to controller object. +func (cb *NodeControllerBuilder) withRecorder(ks kubernetes.Interface) *NodeControllerBuilder { + klog.Infof("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + cb.NodeController.recorder = recorder + return cb +} + +// withEventHandler adds event handlers controller object. +func (cb *NodeControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *NodeControllerBuilder { + cvcInformer := cvcInformerFactory.Local().V1alpha1().LVMNodes() + // Set up an event handler for when lvm node vg change. + // Note: rather than setting up the resync period at informer level, + // we are controlling the syncing based on pollInternal. See + // NodeController#Run func for more details. + cvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: cb.NodeController.addNode, + UpdateFunc: cb.NodeController.updateNode, + DeleteFunc: cb.NodeController.deleteNode, + }, 0) + return cb +} + +func (cb *NodeControllerBuilder) withPollInterval(interval time.Duration) *NodeControllerBuilder { + cb.NodeController.pollInterval = interval + return cb +} + +func (cb *NodeControllerBuilder) withOwnerReference(ownerRef metav1.OwnerReference) *NodeControllerBuilder { + cb.NodeController.ownerRef = ownerRef + return cb +} + +// Build returns a controller instance. +func (cb *NodeControllerBuilder) Build() (*NodeController, error) { + err := openebsScheme.AddToScheme(scheme.Scheme) + if err != nil { + return nil, err + } + return cb.NodeController, nil +} diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go new file mode 100644 index 00000000..4892b053 --- /dev/null +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -0,0 +1,307 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package lvmnode + +import ( + "fmt" + "reflect" + "time" + + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "github.com/openebs/lvm-localpv/pkg/builder/nodebuilder" + "github.com/openebs/lvm-localpv/pkg/equality" + "github.com/openebs/lvm-localpv/pkg/lvm" + k8serror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +func (c *NodeController) listLVMVolumeGroup() ([]apis.VolumeGroup, error) { + return lvm.ListLVMVolumeGroup() +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. +func (c *NodeController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + return c.syncNode(namespace, name) +} + +// syncNode is the function which tries to converge to a desired state for the +// LVMNode +func (c *NodeController) syncNode(namespace string, name string) error { + // Get the node resource with this namespace/name + cachedNode, err := c.NodeLister.LVMNodes(namespace).Get(name) + if err != nil && !k8serror.IsNotFound(err) { + return err + } + var node *apis.LVMNode + if cachedNode != nil { + node = cachedNode.DeepCopy() + } + + vgs, err := c.listLVMVolumeGroup() + if err != nil { + return err + } + + if node == nil { // if it doesn't exists, create lvm node object + if node, err = nodebuilder.NewBuilder(). + WithNamespace(namespace).WithName(name). + WithVolumeGroups(vgs). + WithOwnerReferences(c.ownerRef). + Build(); err != nil { + return err + } + + klog.Infof("lvm node controller: creating new node object for %+v", node) + if node, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil { + return fmt.Errorf("create lvm node %s/%s: %v", namespace, name, err) + } + klog.Infof("lvm node controller: created node object %s/%s", namespace, name) + return nil + } + + // lvm node already exists check if we need to update it. + var updateRequired bool + // validate if owner reference updated. + if ownerRefs, req := c.isOwnerRefsUpdateRequired(node.OwnerReferences); req { + klog.Infof("lvm node controller: node owner references updated current=%+v, required=%+v", + node.OwnerReferences, ownerRefs) + node.OwnerReferences = ownerRefs + updateRequired = true + } + + // validate if node volume groups are upto date. + if !equality.Semantic.DeepEqual(node.VolumeGroups, vgs) { + klog.Infof("lvm node controller: node volume groups updated current=%+v, required=%+v", + node.VolumeGroups, vgs) + node.VolumeGroups = vgs + updateRequired = true + } + + if !updateRequired { + return nil + } + + klog.Infof("lvm node controller: updating node object with %+v", node) + if node, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Update(node); err != nil { + return fmt.Errorf("update lvm node %s/%s: %v", namespace, name, err) + } + klog.Infof("lvm node controller: updated node object %s/%s", namespace, name) + + return nil +} + +// addNode is the add event handler for LVMNode +func (c *NodeController) addNode(obj interface{}) { + node, ok := obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", obj)) + return + } + + klog.Infof("Got add event for lvm node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// updateNode is the update event handler for LVMNode +func (c *NodeController) updateNode(oldObj, newObj interface{}) { + newNode, ok := newObj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", newNode)) + return + } + + klog.Infof("Got update event for lvm node %s/%s", newNode.Namespace, newNode.Name) + c.enqueueNode(newNode) +} + +// deleteNode is the delete event handler for LVMNode +func (c *NodeController) deleteNode(obj interface{}) { + node, ok := obj.(*apis.LVMNode) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + node, ok = tombstone.Obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a LVMNode %#v", obj)) + return + } + } + + klog.Infof("Got delete event for node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// enqueueNode takes a LVMNode resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than LVMNode. +func (c *NodeController) enqueueNode(node *apis.LVMNode) { + // node must exists in openebs namespace & must equal to the node id. + if node.Namespace != lvm.LvmNamespace || + node.Name != lvm.NodeID { + klog.Warningf("skipping lvm node object %s/%s", node.Namespace, node.Name) + return + } + + key, err := cache.MetaNamespaceKeyFunc(node) + if err != nil { + runtime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *NodeController) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.Info("Starting Node controller") + + // Wait for the k8s caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.NodeSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + klog.Info("Starting Node workers") + // Launch worker to process Node resources + // Threadiness will decide the number of workers you want to launch to process work items from queue + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Info("Started Node workers") + + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + case <-stopCh: + klog.Info("Shutting down Node controller") + return nil + } + item := lvm.LvmNamespace + "/" + lvm.NodeID + c.workqueue.Add(item) // add the item to worker queue. + timer.Reset(c.pollInterval) + } +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *NodeController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *NodeController) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Node resource to be synced. + if err := c.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// isOwnerRefUpdateRequired validates if relevant owner references is being +// set for lvm node. If not, it returns the final owner references that needs +// to be set. +func (c *NodeController) isOwnerRefsUpdateRequired(ownerRefs []metav1.OwnerReference) ([]metav1.OwnerReference, bool) { + updated := false + reqOwnerRef := c.ownerRef + for idx := range ownerRefs { + if ownerRefs[idx].UID != reqOwnerRef.UID { + continue + } + // in case owner reference exists, validate + // if controller field is set correctly or not. + if !reflect.DeepEqual(ownerRefs[idx].Controller, reqOwnerRef.Controller) { + updated = true + ownerRefs[idx].Controller = reqOwnerRef.Controller + } + return ownerRefs, updated + } + updated = true + ownerRefs = append(ownerRefs, reqOwnerRef) + return ownerRefs, updated +} diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go new file mode 100644 index 00000000..cd5ee204 --- /dev/null +++ b/pkg/mgmt/lvmnode/start.go @@ -0,0 +1,108 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package lvmnode + +import ( + "sync" + "time" + + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + "github.com/openebs/lvm-localpv/pkg/lvm" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" +) + +// Start starts the lvmnode controller. +func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { + + // Get in cluster config + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrap(err, "error building kubeconfig") + } + + // Building Kubernetes Clientset + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building kubernetes clientset") + } + + // Building OpenEBS Clientset + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building openebs clientset") + } + + // setup watch only on node we are interested in. + nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( + openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() + })) + + k8sNode, err := kubeClient.CoreV1().Nodes().Get(lvm.NodeID, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "fetch k8s node %s", lvm.NodeID) + } + isTrue := true + // as object returned by client go clears all TypeMeta from it. + nodeGVK := &schema.GroupVersionKind{ + Group: "", Version: "v1", Kind: "Node", + } + ownerRef := metav1.OwnerReference{ + APIVersion: nodeGVK.GroupVersion().String(), + Kind: nodeGVK.Kind, + Name: k8sNode.Name, + UID: k8sNode.GetUID(), + Controller: &isTrue, + } + + // Build() fn of all controllers calls AddToScheme to adds all types of this + // clientset into the given scheme. + // If multiple controllers happen to call this AddToScheme same time, + // it causes panic with error saying concurrent map access. + // This lock is used to serialize the AddToScheme call of all controllers. + controllerMtx.Lock() + + controller, err := NewNodeControllerBuilder(). + withKubeClient(kubeClient). + withOpenEBSClient(openebsClient). + withNodeSynced(nodeInformerFactory). + withNodeLister(nodeInformerFactory). + withRecorder(kubeClient). + withEventHandler(nodeInformerFactory). + withPollInterval(60 * time.Second). + withOwnerReference(ownerRef). + withWorkqueueRateLimiting().Build() + + // blocking call, can't use defer to release the lock + controllerMtx.Unlock() + + if err != nil { + return errors.Wrapf(err, "error building controller instance") + } + + nodeInformerFactory.Start(stopCh) + + // Threadiness defines the number of workers to be launched in Run function + return controller.Run(1, stopCh) +}