diff --git a/buildscripts/generate-manifests.sh b/buildscripts/generate-manifests.sh index 863e19a5..69adcaea 100755 --- a/buildscripts/generate-manifests.sh +++ b/buildscripts/generate-manifests.sh @@ -62,6 +62,21 @@ echo ' cat deploy/yamls/local.openebs.io_lvmsnapshots.yaml >> deploy/yamls/lvmsnapshot-crd.yaml rm deploy/yamls/local.openebs.io_lvmsnapshots.yaml +echo ' + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition' > deploy/yamls/lvmnode-crd.yaml + +cat deploy/yamls/local.openebs.io_lvmnodes.yaml >> deploy/yamls/lvmnode-crd.yaml +rm deploy/yamls/local.openebs.io_lvmnodes.yaml + ## create the operator file using all the yamls echo '# This manifest is autogenerated via `make manifests` command @@ -81,6 +96,9 @@ cat deploy/yamls/lvmvolume-crd.yaml >> deploy/lvm-operator.yaml # Add LVMSnapshot v1alpha1 CRDs to the Operator yaml cat deploy/yamls/lvmsnapshot-crd.yaml >> deploy/lvm-operator.yaml +# Add LVMNode v1alpha1 CRDs to the Operator yaml +cat deploy/yamls/lvmnode-crd.yaml >> deploy/lvm-operator.yaml + # Add the driver deployment to the Operator yaml cat deploy/yamls/lvm-driver.yaml >> deploy/lvm-operator.yaml diff --git a/deploy/lvm-operator.yaml b/deploy/lvm-operator.yaml index df0a031a..b0594810 100644 --- a/deploy/lvm-operator.yaml +++ b/deploy/lvm-operator.yaml @@ -77,14 +77,10 @@ spec: description: LVMVolume represents a LVM based volume properties: apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object @@ -96,22 +92,17 @@ spec: minLength: 1 type: string ownerNodeID: - description: OwnerNodeID is the Node ID where the volume group is present - which is where the volume has been provisioned. OwnerNodeID can not - be edited after the volume has been provisioned. + description: OwnerNodeID is the Node ID where the volume group is present which is where the volume has been provisioned. OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string shared: - description: Shared specifies whether the volume can be shared among - multiple pods. If it is not set to "yes", then the LVM LocalPV Driver - will not allow the volumes to be mounted by more than one pods. + description: Shared specifies whether the volume can be shared among multiple pods. If it is not set to "yes", then the LVM LocalPV Driver will not allow the volumes to be mounted by more than one pods. enum: - "yes" - "no" type: string volGroup: - description: VolGroup specifies the name of the volume group where the - volume has been created. + description: VolGroup specifies the name of the volume group where the volume has been created. minLength: 1 type: string required: @@ -120,14 +111,10 @@ spec: - volGroup type: object status: - description: VolStatus string that specifies the current state of the volume - provisioning request. + description: VolStatus string that specifies the current state of the volume provisioning request. properties: state: - description: State specifies the current state of the volume provisioning - request. The state "Pending" means that the volume creation request - has not processed yet. The state "Ready" means that the volume has - been created and it is ready for the use. + description: State specifies the current state of the volume provisioning request. The state "Pending" means that the volume creation request has not processed yet. The state "Ready" means that the volume has been created and it is ready for the use. enum: - Pending - Ready @@ -181,14 +168,10 @@ spec: description: LVMSnapshot represents an LVM Snapshot of the lvm volume properties: apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object @@ -200,22 +183,17 @@ spec: minLength: 1 type: string ownerNodeID: - description: OwnerNodeID is the Node ID where the volume group is present - which is where the volume has been provisioned. OwnerNodeID can not - be edited after the volume has been provisioned. + description: OwnerNodeID is the Node ID where the volume group is present which is where the volume has been provisioned. OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string shared: - description: Shared specifies whether the volume can be shared among - multiple pods. If it is not set to "yes", then the LVM LocalPV Driver - will not allow the volumes to be mounted by more than one pods. + description: Shared specifies whether the volume can be shared among multiple pods. If it is not set to "yes", then the LVM LocalPV Driver will not allow the volumes to be mounted by more than one pods. enum: - "yes" - "no" type: string volGroup: - description: VolGroup specifies the name of the volume group where the - volume has been created. + description: VolGroup specifies the name of the volume group where the volume has been created. minLength: 1 type: string required: @@ -224,8 +202,7 @@ spec: - volGroup type: object status: - description: SnapStatus string that reflects if the snapshot was created - successfully + description: SnapStatus string that reflects if the snapshot was created successfully properties: state: type: string @@ -246,6 +223,108 @@ status: conditions: [] storedVersions: [] + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.2.8 + creationTimestamp: null + name: lvmnodes.local.openebs.io +spec: + group: local.openebs.io + names: + kind: LVMNode + listKind: LVMNodeList + plural: lvmnodes + shortNames: + - lvmnode + singular: lvmnode + preserveUnknownFields: false + scope: Namespaced + validation: + openAPIV3Schema: + description: LVMNode records information about all lvm volume groups available in a node. In general, the openebs node-agent creates the LVMNode object & periodically synchronizing the volume groups available in the node. LVMNode has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + type: string + metadata: + type: object + volumeGroups: + items: + description: VolumeGroup specifies attributes of a given vg exists on node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + lvCount: + description: LVCount denotes total number of logical volumes in volume group. + format: int32 + minimum: 0 + type: integer + name: + description: Name of the lvm volume group. + minLength: 1 + type: string + pvCount: + description: PVCount denotes total number of physical volumes constituting the volume group. + format: int32 + minimum: 0 + type: integer + size: + anyOf: + - type: integer + - type: string + description: Size specifies the total size of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + uuid: + description: UUID denotes a unique identity of a lvm volume group. + minLength: 1 + type: string + required: + - free + - lvCount + - name + - pvCount + - size + - uuid + type: object + type: array + required: + - volumeGroups + type: object + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] + --- # Create the CSI Driver object @@ -925,6 +1004,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -938,7 +1020,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["*"] --- @@ -1145,7 +1227,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/deploy/yamls/lvm-driver.yaml b/deploy/yamls/lvm-driver.yaml index 87649164..9b8440b9 100644 --- a/deploy/yamls/lvm-driver.yaml +++ b/deploy/yamls/lvm-driver.yaml @@ -678,6 +678,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -691,7 +694,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["*"] --- @@ -898,7 +901,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["local.openebs.io"] - resources: ["lvmvolumes", "lvmsnapshots"] + resources: ["lvmvolumes", "lvmsnapshots", "lvmnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/deploy/yamls/lvmnode-crd.yaml b/deploy/yamls/lvmnode-crd.yaml new file mode 100644 index 00000000..b9b3effb --- /dev/null +++ b/deploy/yamls/lvmnode-crd.yaml @@ -0,0 +1,102 @@ + + +############################################## +########### ############ +########### LVMNode CRD ############ +########### ############ +############################################## + +# LVMNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.2.8 + creationTimestamp: null + name: lvmnodes.local.openebs.io +spec: + group: local.openebs.io + names: + kind: LVMNode + listKind: LVMNodeList + plural: lvmnodes + shortNames: + - lvmnode + singular: lvmnode + preserveUnknownFields: false + scope: Namespaced + validation: + openAPIV3Schema: + description: LVMNode records information about all lvm volume groups available in a node. In general, the openebs node-agent creates the LVMNode object & periodically synchronizing the volume groups available in the node. LVMNode has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + type: string + metadata: + type: object + volumeGroups: + items: + description: VolumeGroup specifies attributes of a given vg exists on node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + lvCount: + description: LVCount denotes total number of logical volumes in volume group. + format: int32 + minimum: 0 + type: integer + name: + description: Name of the lvm volume group. + minLength: 1 + type: string + pvCount: + description: PVCount denotes total number of physical volumes constituting the volume group. + format: int32 + minimum: 0 + type: integer + size: + anyOf: + - type: integer + - type: string + description: Size specifies the total size of volume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + uuid: + description: UUID denotes a unique identity of a lvm volume group. + minLength: 1 + type: string + required: + - free + - lvCount + - name + - pvCount + - size + - uuid + type: object + type: array + required: + - volumeGroups + type: object + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/deploy/yamls/lvmsnapshot-crd.yaml b/deploy/yamls/lvmsnapshot-crd.yaml index 1c484f42..2f80fef7 100644 --- a/deploy/yamls/lvmsnapshot-crd.yaml +++ b/deploy/yamls/lvmsnapshot-crd.yaml @@ -32,14 +32,10 @@ spec: description: LVMSnapshot represents an LVM Snapshot of the lvm volume properties: apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object @@ -51,22 +47,17 @@ spec: minLength: 1 type: string ownerNodeID: - description: OwnerNodeID is the Node ID where the volume group is present - which is where the volume has been provisioned. OwnerNodeID can not - be edited after the volume has been provisioned. + description: OwnerNodeID is the Node ID where the volume group is present which is where the volume has been provisioned. OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string shared: - description: Shared specifies whether the volume can be shared among - multiple pods. If it is not set to "yes", then the LVM LocalPV Driver - will not allow the volumes to be mounted by more than one pods. + description: Shared specifies whether the volume can be shared among multiple pods. If it is not set to "yes", then the LVM LocalPV Driver will not allow the volumes to be mounted by more than one pods. enum: - "yes" - "no" type: string volGroup: - description: VolGroup specifies the name of the volume group where the - volume has been created. + description: VolGroup specifies the name of the volume group where the volume has been created. minLength: 1 type: string required: @@ -75,8 +66,7 @@ spec: - volGroup type: object status: - description: SnapStatus string that reflects if the snapshot was created - successfully + description: SnapStatus string that reflects if the snapshot was created successfully properties: state: type: string diff --git a/deploy/yamls/lvmvolume-crd.yaml b/deploy/yamls/lvmvolume-crd.yaml index a78e5a91..0bc4c086 100644 --- a/deploy/yamls/lvmvolume-crd.yaml +++ b/deploy/yamls/lvmvolume-crd.yaml @@ -56,14 +56,10 @@ spec: description: LVMVolume represents a LVM based volume properties: apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object @@ -75,22 +71,17 @@ spec: minLength: 1 type: string ownerNodeID: - description: OwnerNodeID is the Node ID where the volume group is present - which is where the volume has been provisioned. OwnerNodeID can not - be edited after the volume has been provisioned. + description: OwnerNodeID is the Node ID where the volume group is present which is where the volume has been provisioned. OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string shared: - description: Shared specifies whether the volume can be shared among - multiple pods. If it is not set to "yes", then the LVM LocalPV Driver - will not allow the volumes to be mounted by more than one pods. + description: Shared specifies whether the volume can be shared among multiple pods. If it is not set to "yes", then the LVM LocalPV Driver will not allow the volumes to be mounted by more than one pods. enum: - "yes" - "no" type: string volGroup: - description: VolGroup specifies the name of the volume group where the - volume has been created. + description: VolGroup specifies the name of the volume group where the volume has been created. minLength: 1 type: string required: @@ -99,14 +90,10 @@ spec: - volGroup type: object status: - description: VolStatus string that specifies the current state of the volume - provisioning request. + description: VolStatus string that specifies the current state of the volume provisioning request. properties: state: - description: State specifies the current state of the volume provisioning - request. The state "Pending" means that the volume creation request - has not processed yet. The state "Ready" means that the volume has - been created and it is ready for the use. + description: State specifies the current state of the volume provisioning request. The state "Pending" means that the volume creation request has not processed yet. The state "Ready" means that the volume has been created and it is ready for the use. enum: - Pending - Ready diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go b/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..38de28aa --- /dev/null +++ b/pkg/apis/openebs.io/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,80 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=lvmnode + +// LVMNode records information about all lvm volume groups available +// in a node. In general, the openebs node-agent creates the LVMNode +// object & periodically synchronizing the volume groups available in the node. +// LVMNode has an owner reference pointing to the corresponding node object. +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Namespaced,shortName=lvmnode +type LVMNode struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + VolumeGroups []VolumeGroup `json:"volumeGroups"` +} + +// VolumeGroup specifies attributes of a given vg exists on node. +type VolumeGroup struct { + // Name of the lvm volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // UUID denotes a unique identity of a lvm volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + UUID string `json:"uuid"` + + // Size specifies the total size of volume group. + // +kubebuilder:validation:Required + Size resource.Quantity `json:"size"` + // Free specifies the available capacity of volume group. + // +kubebuilder:validation:Required + Free resource.Quantity `json:"free"` + + // LVCount denotes total number of logical volumes in + // volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=0 + LVCount int32 `json:"lvCount"` + // PVCount denotes total number of physical volumes + // constituting the volume group. + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=0 + PVCount int32 `json:"pvCount"` +} + +// LVMNodeList is a collection of LVMNode resources +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=lvmnodes +type LVMNodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []LVMNode `json:"items"` +} diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/register.go b/pkg/apis/openebs.io/lvm/v1alpha1/register.go index e05b4c26..867f78e9 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/register.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/register.go @@ -73,6 +73,9 @@ func addKnownTypes(scheme *runtime.Scheme) error { &LVMVolumeList{}, &LVMSnapshot{}, &LVMSnapshotList{}, + + &LVMNode{}, + &LVMNodeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go index 16c02c22..d365534c 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,72 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LVMNode) DeepCopyInto(out *LVMNode) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.VolumeGroups != nil { + in, out := &in.VolumeGroups, &out.VolumeGroups + *out = make([]VolumeGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LVMNode. +func (in *LVMNode) DeepCopy() *LVMNode { + if in == nil { + return nil + } + out := new(LVMNode) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LVMNode) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LVMNodeList) DeepCopyInto(out *LVMNodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]LVMNode, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LVMNodeList. +func (in *LVMNodeList) DeepCopy() *LVMNodeList { + if in == nil { + return nil + } + out := new(LVMNodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LVMNodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LVMSnapshot) DeepCopyInto(out *LVMSnapshot) { *out = *in @@ -178,6 +244,24 @@ func (in *VolStatus) DeepCopy() *VolStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VolumeGroup) DeepCopyInto(out *VolumeGroup) { + *out = *in + out.Size = in.Size.DeepCopy() + out.Free = in.Free.DeepCopy() + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeGroup. +func (in *VolumeGroup) DeepCopy() *VolumeGroup { + if in == nil { + return nil + } + out := new(VolumeGroup) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeInfo) DeepCopyInto(out *VolumeInfo) { *out = *in diff --git a/pkg/builder/nodebuilder/kubernetes.go b/pkg/builder/nodebuilder/kubernetes.go new file mode 100644 index 00000000..8375d463 --- /dev/null +++ b/pkg/builder/nodebuilder/kubernetes.go @@ -0,0 +1,427 @@ +// Copyright © 2019 The OpenEBS Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebuilder + +import ( + "encoding/json" + + client "github.com/openebs/lib-csi/pkg/common/kubernetes/client" + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// getClientsetFn is a typed function that +// abstracts fetching of internal clientset +type getClientsetFn func() (clientset *clientset.Clientset, err error) + +// getClientsetFromPathFn is a typed function that +// abstracts fetching of clientset from kubeConfigPath +type getClientsetForPathFn func(kubeConfigPath string) ( + clientset *clientset.Clientset, + err error, +) + +// createFn is a typed function that abstracts +// creating lvm node instance +type createFn func( + cs *clientset.Clientset, + upgradeResultObj *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) + +// getFn is a typed function that abstracts +// fetching a lvm node instance +type getFn func( + cli *clientset.Clientset, + name, + namespace string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) + +// listFn is a typed function that abstracts +// listing of lvm node instances +type listFn func( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.LVMNodeList, error) + +// delFn is a typed function that abstracts +// deleting a lvm node instance +type delFn func( + cli *clientset.Clientset, + name, + namespace string, + opts *metav1.DeleteOptions, +) error + +// updateFn is a typed function that abstracts +// updating lvm node instance +type updateFn func( + cs *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) + +// Kubeclient enables kubernetes API operations +// on lvm node instance +type Kubeclient struct { + // clientset refers to lvm node's + // clientset that will be responsible to + // make kubernetes API calls + clientset *clientset.Clientset + + kubeConfigPath string + + // namespace holds the namespace on which + // kubeclient has to operate + namespace string + + // functions useful during mocking + getClientset getClientsetFn + getClientsetForPath getClientsetForPathFn + get getFn + list listFn + del delFn + create createFn + update updateFn +} + +// KubeclientBuildOption defines the abstraction +// to build a kubeclient instance +type KubeclientBuildOption func(*Kubeclient) + +// defaultGetClientset is the default implementation to +// get kubernetes clientset instance +func defaultGetClientset() (clients *clientset.Clientset, err error) { + + config, err := client.GetConfig(client.New()) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) + +} + +// defaultGetClientsetForPath is the default implementation to +// get kubernetes clientset instance based on the given +// kubeconfig path +func defaultGetClientsetForPath( + kubeConfigPath string, +) (clients *clientset.Clientset, err error) { + config, err := client.GetConfig( + client.New(client.WithKubeConfigPath(kubeConfigPath))) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) +} + +// defaultGet is the default implementation to get +// a lvm node instance in kubernetes cluster +func defaultGet( + cli *clientset.Clientset, + name, namespace string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Get(name, opts) +} + +// defaultList is the default implementation to list +// lvm node instances in kubernetes cluster +func defaultList( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.LVMNodeList, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + List(opts) +} + +// defaultCreate is the default implementation to delete +// a lvm node instance in kubernetes cluster +func defaultDel( + cli *clientset.Clientset, + name, namespace string, + opts *metav1.DeleteOptions, +) error { + deletePropagation := metav1.DeletePropagationForeground + opts.PropagationPolicy = &deletePropagation + err := cli.LocalV1alpha1(). + LVMNodes(namespace). + Delete(name, opts) + return err +} + +// defaultCreate is the default implementation to create +// a lvm node instance in kubernetes cluster +func defaultCreate( + cli *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Create(node) +} + +// defaultUpdate is the default implementation to update +// a lvm node instance in kubernetes cluster +func defaultUpdate( + cli *clientset.Clientset, + node *apis.LVMNode, + namespace string, +) (*apis.LVMNode, error) { + return cli.LocalV1alpha1(). + LVMNodes(namespace). + Update(node) +} + +// withDefaults sets the default options +// of kubeclient instance +func (k *Kubeclient) withDefaults() { + if k.getClientset == nil { + k.getClientset = defaultGetClientset + } + if k.getClientsetForPath == nil { + k.getClientsetForPath = defaultGetClientsetForPath + } + if k.get == nil { + k.get = defaultGet + } + if k.list == nil { + k.list = defaultList + } + if k.del == nil { + k.del = defaultDel + } + if k.create == nil { + k.create = defaultCreate + } + if k.update == nil { + k.update = defaultUpdate + } +} + +// WithClientSet sets the kubernetes client against +// the kubeclient instance +func WithClientSet(c *clientset.Clientset) KubeclientBuildOption { + return func(k *Kubeclient) { + k.clientset = c + } +} + +// WithNamespace sets the kubernetes client against +// the provided namespace +func WithNamespace(namespace string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.namespace = namespace + } +} + +// WithNamespace sets the provided namespace +// against this Kubeclient instance +func (k *Kubeclient) WithNamespace(namespace string) *Kubeclient { + k.namespace = namespace + return k +} + +// WithKubeConfigPath sets the kubernetes client +// against the provided path +func WithKubeConfigPath(path string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.kubeConfigPath = path + } +} + +// NewKubeclient returns a new instance of +// kubeclient meant for lvm node operations +func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient { + k := &Kubeclient{} + for _, o := range opts { + o(k) + } + + k.withDefaults() + return k +} + +func (k *Kubeclient) getClientsetForPathOrDirect() ( + *clientset.Clientset, + error, +) { + if k.kubeConfigPath != "" { + return k.getClientsetForPath(k.kubeConfigPath) + } + + return k.getClientset() +} + +// getClientOrCached returns either a new instance +// of kubernetes client or its cached copy +func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) { + if k.clientset != nil { + return k.clientset, nil + } + + c, err := k.getClientsetForPathOrDirect() + if err != nil { + return nil, + errors.Wrapf( + err, + "failed to get clientset", + ) + } + + k.clientset = c + return k.clientset, nil +} + +// Create creates a lvm node instance +// in kubernetes cluster +func (k *Kubeclient) Create(node *apis.LVMNode) (*apis.LVMNode, error) { + if node == nil { + return nil, + errors.New( + "failed to create lvm node: nil node object", + ) + } + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create lvm node {%s} in namespace {%s}", + node.Name, + k.namespace, + ) + } + + return k.create(cs, node, k.namespace) +} + +// Get returns lvm node object for given name +func (k *Kubeclient) Get( + name string, + opts metav1.GetOptions, +) (*apis.LVMNode, error) { + if name == "" { + return nil, + errors.New( + "failed to get lvm node: missing lvm node name", + ) + } + + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get lvm node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.get(cli, name, k.namespace, opts) +} + +// GetRaw returns lvm node instance +// in bytes +func (k *Kubeclient) GetRaw( + name string, + opts metav1.GetOptions, +) ([]byte, error) { + if name == "" { + return nil, errors.New( + "failed to get raw lvm node: missing node name", + ) + } + csiv, err := k.Get(name, opts) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get lvm node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return json.Marshal(csiv) +} + +// List returns a list of lvm node +// instances present in kubernetes cluster +func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.LVMNodeList, error) { + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to list lvm nodes in namespace {%s}", + k.namespace, + ) + } + + return k.list(cli, k.namespace, opts) +} + +// Delete deletes the lvm node from +// kubernetes +func (k *Kubeclient) Delete(name string) error { + if name == "" { + return errors.New( + "failed to delete lvmnode: missing node name", + ) + } + cli, err := k.getClientOrCached() + if err != nil { + return errors.Wrapf( + err, + "failed to delete lvmnode {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.del(cli, name, k.namespace, &metav1.DeleteOptions{}) +} + +// Update updates this lvm node instance +// against kubernetes cluster +func (k *Kubeclient) Update(node *apis.LVMNode) (*apis.LVMNode, error) { + if node == nil { + return nil, + errors.New( + "failed to update lvmnode: nil node object", + ) + } + + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to update lvmnode {%s} in namespace {%s}", + node.Name, + node.Namespace, + ) + } + + return k.update(cs, node, k.namespace) +} diff --git a/pkg/builder/nodebuilder/node.go b/pkg/builder/nodebuilder/node.go new file mode 100644 index 00000000..2e4a0802 --- /dev/null +++ b/pkg/builder/nodebuilder/node.go @@ -0,0 +1,122 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodebuilder + +import ( + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Builder is the builder object for LVMNode +type Builder struct { + node *LVMNode + errs []error +} + +// LVMNode is a wrapper over +// LVMNode API instance +type LVMNode struct { + // LVMVolume object + Object *apis.LVMNode +} + +// From returns a new instance of +// lvm volume +func From(node *apis.LVMNode) *LVMNode { + return &LVMNode{ + Object: node, + } +} + +// NewBuilder returns new instance of Builder +func NewBuilder() *Builder { + return &Builder{ + node: &LVMNode{ + Object: &apis.LVMNode{}, + }, + } +} + +// BuildFrom returns new instance of Builder +// from the provided api instance +func BuildFrom(node *apis.LVMNode) *Builder { + if node == nil { + b := NewBuilder() + b.errs = append( + b.errs, + errors.New("failed to build lvm node object: nil node"), + ) + return b + } + return &Builder{ + node: &LVMNode{ + Object: node, + }, + } +} + +// WithNamespace sets the namespace of LVMNode +func (b *Builder) WithNamespace(namespace string) *Builder { + if namespace == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build lvm node object: missing namespace", + ), + ) + return b + } + b.node.Object.Namespace = namespace + return b +} + +// WithName sets the name of LVMNode +func (b *Builder) WithName(name string) *Builder { + if name == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build lvm node object: missing name", + ), + ) + return b + } + b.node.Object.Name = name + return b +} + +// WithVolumeGroups sets the volume groups of LVMNode +func (b *Builder) WithVolumeGroups(vgs []apis.VolumeGroup) *Builder { + b.node.Object.VolumeGroups = vgs + return b +} + +// WithOwnerReferences sets the owner references of LVMNode +func (b *Builder) WithOwnerReferences(ownerRefs ...metav1.OwnerReference) *Builder { + b.node.Object.OwnerReferences = ownerRefs + return b +} + +// Build returns LVMNode API object +func (b *Builder) Build() (*apis.LVMNode, error) { + if len(b.errs) > 0 { + return nil, errors.Errorf("%+v", b.errs) + } + + return b.node.Object, nil +} diff --git a/pkg/driver/agent.go b/pkg/driver/agent.go index fd6fa2d9..56db2111 100644 --- a/pkg/driver/agent.go +++ b/pkg/driver/agent.go @@ -27,6 +27,7 @@ import ( apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "github.com/openebs/lvm-localpv/pkg/builder/volbuilder" "github.com/openebs/lvm-localpv/pkg/lvm" + "github.com/openebs/lvm-localpv/pkg/mgmt/lvmnode" "github.com/openebs/lvm-localpv/pkg/mgmt/snapshot" "github.com/openebs/lvm-localpv/pkg/mgmt/volume" "golang.org/x/net/context" @@ -52,6 +53,13 @@ func NewNode(d *CSIDriver) csi.NodeServer { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() + go func() { + err := lvmnode.Start(&ControllerMutex, stopCh) + if err != nil { + klog.Fatalf("Failed to start LVM node controller: %s", err.Error()) + } + }() + // start the lvmvolume watcher go func() { err := volume.Start(&ControllerMutex, stopCh) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 11439773..1df37361 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -18,16 +18,27 @@ package driver import ( "fmt" - "github.com/openebs/lvm-localpv/pkg/builder/snapbuilder" "strconv" "strings" "time" + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + "github.com/openebs/lvm-localpv/pkg/builder/snapbuilder" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + "github.com/container-storage-interface/spec/lib/go/csi" + lvmapi "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" k8serror "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" "k8s.io/klog" errors "github.com/openebs/lib-csi/pkg/common/errors" @@ -52,15 +63,26 @@ const ( type controller struct { driver *CSIDriver capabilities []*csi.ControllerServiceCapability + + indexedLabel string + + k8sNodeInformer cache.SharedIndexInformer + lvmNodeInformer cache.SharedIndexInformer } // NewController returns a new instance // of CSI controller func NewController(d *CSIDriver) csi.ControllerServer { - return &controller{ + ctrl := &controller{ driver: d, capabilities: newControllerCapabilities(), } + + if err := ctrl.init(); err != nil { + klog.Fatalf("init controller: %v", err) + } + + return ctrl } // SupportedVolumeCapabilityAccessModes contains the list of supported access @@ -133,6 +155,50 @@ func waitForVolDestroy(volname string) error { return nil } +func (cs *controller) init() error { + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrapf(err, "failed to build kubeconfig") + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build k8s clientset") + } + + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build openebs clientset") + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + openebsInformerfactory := informers.NewSharedInformerFactoryWithOptions(openebsClient, + 0, informers.WithNamespace(lvm.LvmNamespace)) + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cs.k8sNodeInformer = kubeInformerFactory.Core().V1().Nodes().Informer() + cs.lvmNodeInformer = openebsInformerfactory.Local().V1alpha1().LVMNodes().Informer() + + if err = cs.lvmNodeInformer.AddIndexers(map[string]cache.IndexFunc{ + LabelIndexName(cs.indexedLabel): LabelIndexFunc(cs.indexedLabel), + }); err != nil { + return errors.Wrapf(err, "failed to add index on label %v", cs.indexedLabel) + } + + go cs.k8sNodeInformer.Run(stopCh) + go cs.lvmNodeInformer.Run(stopCh) + + // wait for all the caches to be populated. + klog.Info("waiting for k8s & lvm node informer caches to be synced") + cache.WaitForCacheSync(stopCh, + cs.k8sNodeInformer.HasSynced, + cs.lvmNodeInformer.HasSynced) + klog.Info("synced k8s & lvm node informer caches") + return nil +} + // CreateLVMVolume create new lvm volume for csi volume request func CreateLVMVolume(req *csi.CreateVolumeRequest) (string, error) { volName := strings.ToLower(req.GetName()) @@ -611,7 +677,7 @@ func (cs *controller) ControllerPublishVolume( } // GetCapacity return the capacity of the -// given volume +// given node topology segment. // // This implements csi.ControllerServer func (cs *controller) GetCapacity( @@ -619,7 +685,89 @@ func (cs *controller) GetCapacity( req *csi.GetCapacityRequest, ) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + var segments map[string]string + if topology := req.GetAccessibleTopology(); topology != nil { + segments = topology.Segments + } + nodeNames, err := cs.filterNodesByTopology(segments) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + lvmNodesCache := cs.lvmNodeInformer.GetIndexer() + params := req.GetParameters() + vgParam := helpers.GetInsensitiveParameter(¶ms, "volgroup") + + var availableCapacity int64 + for _, nodeName := range nodeNames { + v, exists, err := lvmNodesCache.GetByKey(lvm.LvmNamespace + "/" + nodeName) + if err != nil { + klog.Warning("unexpected error after querying the lvmNode informer cache") + continue + } + if !exists { + continue + } + lvmNode := v.(*lvmapi.LVMNode) + // rather than summing all free capacity, we are calculating maximum + // lv size that gets fit in given vg. + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#available-capacity-vs-maximum-volume-size & + // https://github.com/container-storage-interface/spec/issues/432 for more details + for _, vg := range lvmNode.VolumeGroups { + if vg.Name != vgParam { + continue + } + freeCapacity := vg.Free.Value() + if availableCapacity < freeCapacity { + availableCapacity = freeCapacity + } + } + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: availableCapacity, + }, nil +} + +func (cs *controller) filterNodesByTopology(segments map[string]string) ([]string, error) { + nodesCache := cs.k8sNodeInformer.GetIndexer() + if len(segments) == 0 { + return nodesCache.ListKeys(), nil + } + + filterNodes := func(vs []interface{}) ([]string, error) { + var names []string + selector := labels.SelectorFromSet(segments) + for _, v := range vs { + meta, err := apimeta.Accessor(v) + if err != nil { + return nil, err + } + if selector.Matches(labels.Set(meta.GetLabels())) { + names = append(names, meta.GetName()) + } + } + return names, nil + } + + // first see if we need to filter the informer cache by indexed label, + // so that we don't need to iterate over all the nodes for performance + // reasons in large cluster. + indexName := LabelIndexName(cs.indexedLabel) + if _, ok := nodesCache.GetIndexers()[indexName]; !ok { + // run through all the nodes in case indexer doesn't exists. + return filterNodes(nodesCache.List()) + } + + if segValue, ok := segments[cs.indexedLabel]; ok { + vs, err := nodesCache.ByIndex(indexName, segValue) + if err != nil { + return nil, errors.Wrapf(err, "query indexed store indexName=%v indexKey=%v", + indexName, segValue) + } + return filterNodes(vs) + } + return filterNodes(nodesCache.List()) } // ListVolumes lists all the volumes @@ -700,6 +848,7 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability { csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, } { capabilities = append(capabilities, fromType(cap)) } @@ -787,3 +936,24 @@ func validateSnapshotRequest(req *csi.CreateSnapshotRequest) error { } return nil } + +// LabelIndexName add prefix for label index. +func LabelIndexName(label string) string { + return "l:" + label +} + +// LabelIndexFunc defines index values for given label. +func LabelIndexFunc(label string) cache.IndexFunc { + return func(obj interface{}) ([]string, error) { + meta, err := apimeta.Accessor(obj) + if err != nil { + return nil, fmt.Errorf( + "k8s api object type (%T) doesn't implements metav1.Object interface: %v", obj, err) + } + var vs []string + if v, ok := meta.GetLabels()[label]; ok { + vs = append(vs, v) + } + return vs, nil + } +} diff --git a/pkg/equality/semantic.go b/pkg/equality/semantic.go new file mode 100644 index 00000000..3da98f29 --- /dev/null +++ b/pkg/equality/semantic.go @@ -0,0 +1,55 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package equality + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" +) + +// Since we are using quite older version of k8s.io/* libraries, we need take up +// their upgrades as part of separate pull request. +// +// Copied from https://github.com/kubernetes/apimachinery/blob/c93b0f84892eb6bcbc80b312ae70729d8168bc7e/pkg/api/equality/semantic.go +// More details at - https://github.com/kubernetes/apimachinery/issues/75 + +// Semantic can do semantic deep equality checks for api objects. +// Example: apiequality.Semantic.DeepEqual(aPod, aPodWithNonNilButEmptyMaps) == true +var Semantic = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC() == b.UTC() + }, + func(a, b metav1.Time) bool { + return a.UTC() == b.UTC() + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, +) diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go index 9eabcf68..6642add4 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvm_client.go @@ -28,6 +28,10 @@ type FakeLocalV1alpha1 struct { *testing.Fake } +func (c *FakeLocalV1alpha1) LVMNodes(namespace string) v1alpha1.LVMNodeInterface { + return &FakeLVMNodes{c, namespace} +} + func (c *FakeLocalV1alpha1) LVMSnapshots(namespace string) v1alpha1.LVMSnapshotInterface { return &FakeLVMSnapshots{c, namespace} } diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go new file mode 100644 index 00000000..cf8424ed --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/fake/fake_lvmnode.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeLVMNodes implements LVMNodeInterface +type FakeLVMNodes struct { + Fake *FakeLocalV1alpha1 + ns string +} + +var lvmnodesResource = schema.GroupVersionResource{Group: "local.openebs.io", Version: "v1alpha1", Resource: "lvmnodes"} + +var lvmnodesKind = schema.GroupVersionKind{Group: "local.openebs.io", Version: "v1alpha1", Kind: "LVMNode"} + +// Get takes name of the lVMNode, and returns the corresponding lVMNode object, and an error if there is any. +func (c *FakeLVMNodes) Get(name string, options v1.GetOptions) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(lvmnodesResource, c.ns, name), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// List takes label and field selectors, and returns the list of LVMNodes that match those selectors. +func (c *FakeLVMNodes) List(opts v1.ListOptions) (result *v1alpha1.LVMNodeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(lvmnodesResource, lvmnodesKind, c.ns, opts), &v1alpha1.LVMNodeList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.LVMNodeList{ListMeta: obj.(*v1alpha1.LVMNodeList).ListMeta} + for _, item := range obj.(*v1alpha1.LVMNodeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested lVMNodes. +func (c *FakeLVMNodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(lvmnodesResource, c.ns, opts)) + +} + +// Create takes the representation of a lVMNode and creates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *FakeLVMNodes) Create(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(lvmnodesResource, c.ns, lVMNode), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// Update takes the representation of a lVMNode and updates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *FakeLVMNodes) Update(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(lvmnodesResource, c.ns, lVMNode), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} + +// Delete takes name of the lVMNode and deletes it. Returns an error if one occurs. +func (c *FakeLVMNodes) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(lvmnodesResource, c.ns, name), &v1alpha1.LVMNode{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeLVMNodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(lvmnodesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.LVMNodeList{}) + return err +} + +// Patch applies the patch and returns the patched lVMNode. +func (c *FakeLVMNodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(lvmnodesResource, c.ns, name, pt, data, subresources...), &v1alpha1.LVMNode{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LVMNode), err +} diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go index 794b878d..7e7d8b46 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/generated_expansion.go @@ -18,6 +18,8 @@ limitations under the License. package v1alpha1 +type LVMNodeExpansion interface{} + type LVMSnapshotExpansion interface{} type LVMVolumeExpansion interface{} diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go index e7c713ca..85a70fb3 100644 --- a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvm_client.go @@ -26,6 +26,7 @@ import ( type LocalV1alpha1Interface interface { RESTClient() rest.Interface + LVMNodesGetter LVMSnapshotsGetter LVMVolumesGetter } @@ -35,6 +36,10 @@ type LocalV1alpha1Client struct { restClient rest.Interface } +func (c *LocalV1alpha1Client) LVMNodes(namespace string) LVMNodeInterface { + return newLVMNodes(c, namespace) +} + func (c *LocalV1alpha1Client) LVMSnapshots(namespace string) LVMSnapshotInterface { return newLVMSnapshots(c, namespace) } diff --git a/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..e9b560d2 --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,174 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "time" + + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + scheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// LVMNodesGetter has a method to return a LVMNodeInterface. +// A group's client should implement this interface. +type LVMNodesGetter interface { + LVMNodes(namespace string) LVMNodeInterface +} + +// LVMNodeInterface has methods to work with LVMNode resources. +type LVMNodeInterface interface { + Create(*v1alpha1.LVMNode) (*v1alpha1.LVMNode, error) + Update(*v1alpha1.LVMNode) (*v1alpha1.LVMNode, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.LVMNode, error) + List(opts v1.ListOptions) (*v1alpha1.LVMNodeList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) + LVMNodeExpansion +} + +// lVMNodes implements LVMNodeInterface +type lVMNodes struct { + client rest.Interface + ns string +} + +// newLVMNodes returns a LVMNodes +func newLVMNodes(c *LocalV1alpha1Client, namespace string) *lVMNodes { + return &lVMNodes{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the lVMNode, and returns the corresponding lVMNode object, and an error if there is any. +func (c *lVMNodes) Get(name string, options v1.GetOptions) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of LVMNodes that match those selectors. +func (c *lVMNodes) List(opts v1.ListOptions) (result *v1alpha1.LVMNodeList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.LVMNodeList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested lVMNodes. +func (c *lVMNodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch() +} + +// Create takes the representation of a lVMNode and creates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *lVMNodes) Create(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Post(). + Namespace(c.ns). + Resource("lvmnodes"). + Body(lVMNode). + Do(). + Into(result) + return +} + +// Update takes the representation of a lVMNode and updates it. Returns the server's representation of the lVMNode, and an error, if there is any. +func (c *lVMNodes) Update(lVMNode *v1alpha1.LVMNode) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Put(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(lVMNode.Name). + Body(lVMNode). + Do(). + Into(result) + return +} + +// Delete takes name of the lVMNode and deletes it. Returns an error if one occurs. +func (c *lVMNodes) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("lvmnodes"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *lVMNodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + var timeout time.Duration + if listOptions.TimeoutSeconds != nil { + timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("lvmnodes"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Timeout(timeout). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched lVMNode. +func (c *lVMNodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.LVMNode, err error) { + result = &v1alpha1.LVMNode{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("lvmnodes"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/generated/informer/externalversions/generic.go b/pkg/generated/informer/externalversions/generic.go index df392d41..2c126da7 100644 --- a/pkg/generated/informer/externalversions/generic.go +++ b/pkg/generated/informer/externalversions/generic.go @@ -53,6 +53,8 @@ func (f *genericInformer) Lister() cache.GenericLister { func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { // Group=local.openebs.io, Version=v1alpha1 + case v1alpha1.SchemeGroupVersion.WithResource("lvmnodes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Local().V1alpha1().LVMNodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("lvmsnapshots"): return &genericInformer{resource: resource.GroupResource(), informer: f.Local().V1alpha1().LVMSnapshots().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("lvmvolumes"): diff --git a/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go b/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go index f72897a3..c2519c5c 100644 --- a/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go +++ b/pkg/generated/informer/externalversions/lvm/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // LVMNodes returns a LVMNodeInformer. + LVMNodes() LVMNodeInformer // LVMSnapshots returns a LVMSnapshotInformer. LVMSnapshots() LVMSnapshotInformer // LVMVolumes returns a LVMVolumeInformer. @@ -41,6 +43,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// LVMNodes returns a LVMNodeInformer. +func (v *version) LVMNodes() LVMNodeInformer { + return &lVMNodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // LVMSnapshots returns a LVMSnapshotInformer. func (v *version) LVMSnapshots() LVMSnapshotInformer { return &lVMSnapshotInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go b/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..4773dcaf --- /dev/null +++ b/pkg/generated/informer/externalversions/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,89 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + lvmv1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + internalclientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + internalinterfaces "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions/internalinterfaces" + v1alpha1 "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// LVMNodeInformer provides access to a shared informer and lister for +// LVMNodes. +type LVMNodeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.LVMNodeLister +} + +type lVMNodeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewLVMNodeInformer constructs a new informer for LVMNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewLVMNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredLVMNodeInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredLVMNodeInformer constructs a new informer for LVMNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredLVMNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LocalV1alpha1().LVMNodes(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LocalV1alpha1().LVMNodes(namespace).Watch(options) + }, + }, + &lvmv1alpha1.LVMNode{}, + resyncPeriod, + indexers, + ) +} + +func (f *lVMNodeInformer) defaultInformer(client internalclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredLVMNodeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *lVMNodeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&lvmv1alpha1.LVMNode{}, f.defaultInformer) +} + +func (f *lVMNodeInformer) Lister() v1alpha1.LVMNodeLister { + return v1alpha1.NewLVMNodeLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go b/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go index 0513aa49..66be1551 100644 --- a/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go +++ b/pkg/generated/lister/lvm/v1alpha1/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package v1alpha1 +// LVMNodeListerExpansion allows custom methods to be added to +// LVMNodeLister. +type LVMNodeListerExpansion interface{} + +// LVMNodeNamespaceListerExpansion allows custom methods to be added to +// LVMNodeNamespaceLister. +type LVMNodeNamespaceListerExpansion interface{} + // LVMSnapshotListerExpansion allows custom methods to be added to // LVMSnapshotLister. type LVMSnapshotListerExpansion interface{} diff --git a/pkg/generated/lister/lvm/v1alpha1/lvmnode.go b/pkg/generated/lister/lvm/v1alpha1/lvmnode.go new file mode 100644 index 00000000..6058890b --- /dev/null +++ b/pkg/generated/lister/lvm/v1alpha1/lvmnode.go @@ -0,0 +1,94 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// LVMNodeLister helps list LVMNodes. +type LVMNodeLister interface { + // List lists all LVMNodes in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) + // LVMNodes returns an object that can list and get LVMNodes. + LVMNodes(namespace string) LVMNodeNamespaceLister + LVMNodeListerExpansion +} + +// lVMNodeLister implements the LVMNodeLister interface. +type lVMNodeLister struct { + indexer cache.Indexer +} + +// NewLVMNodeLister returns a new LVMNodeLister. +func NewLVMNodeLister(indexer cache.Indexer) LVMNodeLister { + return &lVMNodeLister{indexer: indexer} +} + +// List lists all LVMNodes in the indexer. +func (s *lVMNodeLister) List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LVMNode)) + }) + return ret, err +} + +// LVMNodes returns an object that can list and get LVMNodes. +func (s *lVMNodeLister) LVMNodes(namespace string) LVMNodeNamespaceLister { + return lVMNodeNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// LVMNodeNamespaceLister helps list and get LVMNodes. +type LVMNodeNamespaceLister interface { + // List lists all LVMNodes in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) + // Get retrieves the LVMNode from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.LVMNode, error) + LVMNodeNamespaceListerExpansion +} + +// lVMNodeNamespaceLister implements the LVMNodeNamespaceLister +// interface. +type lVMNodeNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all LVMNodes in the indexer for a given namespace. +func (s lVMNodeNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.LVMNode, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LVMNode)) + }) + return ret, err +} + +// Get retrieves the LVMNode from the indexer for a given namespace and name. +func (s lVMNodeNamespaceLister) Get(name string) (*v1alpha1.LVMNode, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("lvmnode"), name) + } + return obj.(*v1alpha1.LVMNode), nil +} diff --git a/pkg/lvm/lvm_util.go b/pkg/lvm/lvm_util.go index 3337a03e..0fddbcf3 100644 --- a/pkg/lvm/lvm_util.go +++ b/pkg/lvm/lvm_util.go @@ -17,12 +17,16 @@ limitations under the License. package lvm import ( + "encoding/json" + "fmt" "os" "os/exec" + "strconv" "strings" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog" ) @@ -35,6 +39,8 @@ const ( // lvm command related constants const ( VGCreate = "vgcreate" + VGList = "vgs" + LVCreate = "lvcreate" LVRemove = "lvremove" LVExtend = "lvextend" @@ -265,3 +271,82 @@ func DestroySnapshot(snap *apis.LVMSnapshot) error { func getLVMSnapName(snapName string) string { return strings.TrimPrefix(snapName, "snapshot-") } + +func decodeVgsJSON(raw []byte) ([]apis.VolumeGroup, error) { + output := &struct { + Report []struct { + VolumeGroups []map[string]string `json:"vg"` + } `json:"report"` + }{} + var err error + if err = json.Unmarshal(raw, output); err != nil { + return nil, err + } + + if len(output.Report) != 1 { + return nil, fmt.Errorf("expected exactly one lvm report") + } + + items := output.Report[0].VolumeGroups + vgs := make([]apis.VolumeGroup, 0, len(items)) + for _, item := range items { + var vg apis.VolumeGroup + if vg, err = parseVolumeGroup(item); err != nil { + return vgs, err + } + vgs = append(vgs, vg) + } + return vgs, nil +} + +func parseVolumeGroup(m map[string]string) (apis.VolumeGroup, error) { + var vg apis.VolumeGroup + vg.Name = m["vg_name"] + vg.UUID = m["vg_uuid"] + + int32Map := map[string]*int32{ + "pv_count": &vg.PVCount, + "lv_count": &vg.LVCount, + } + for key, value := range int32Map { + count, err := strconv.Atoi(m[key]) + if err != nil { + err = fmt.Errorf("invalid format of %v=%v for vg %v: %v", key, m[key], vg.Name, err) + } + *value = int32(count) + } + + resQuantityMap := map[string]*resource.Quantity{ + "vg_size": &vg.Size, + "vg_free": &vg.Free, + } + + for key, value := range resQuantityMap { + sizeBytes, err := strconv.ParseInt( + strings.TrimSuffix(strings.ToLower(m[key]), "b"), + 10, 64) + if err != nil { + err = fmt.Errorf("invalid format of %v=%v for vg %v: %v", key, m[key], vg.Name, err) + } + quantity := resource.NewQuantity(sizeBytes, resource.BinarySI) + *value = *quantity // + } + return vg, nil +} + +// ListLVMVolumeGroup invokes `vgs` to list all the available volume +// groups in the node. +func ListLVMVolumeGroup() ([]apis.VolumeGroup, error) { + args := []string{ + "--options", "vg_all", + "--reportformat", "json", + "--units", "b", + } + cmd := exec.Command(VGList, args...) + output, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("lvm: list volume group cmd %v: %v", args, err) + return nil, err + } + return decodeVgsJSON(output) +} diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go new file mode 100644 index 00000000..953c9f4f --- /dev/null +++ b/pkg/mgmt/lvmnode/builder.go @@ -0,0 +1,157 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lvmnode + +import ( + "time" + + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + openebsScheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + listers "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +const controllerAgentName = "lvmnode-controller" + +// NodeController is the controller implementation for lvm node resources +type NodeController struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + + // clientset is a openebs custom resource package generated for custom API group. + clientset clientset.Interface + + NodeLister listers.LVMNodeLister + + // NodeSynced is used for caches sync to get populated + NodeSynced cache.InformerSynced + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder + + // pollInterval controls the polling frequency of syncing up the vg metadata. + pollInterval time.Duration + + // ownerRef is used to set the owner reference to lvmnode objects. + ownerRef metav1.OwnerReference +} + +// NodeControllerBuilder is the builder object for controller. +type NodeControllerBuilder struct { + NodeController *NodeController +} + +// NewNodeControllerBuilder returns an empty instance of controller builder. +func NewNodeControllerBuilder() *NodeControllerBuilder { + return &NodeControllerBuilder{ + NodeController: &NodeController{}, + } +} + +// withKubeClient fills kube client to controller object. +func (cb *NodeControllerBuilder) withKubeClient(ks kubernetes.Interface) *NodeControllerBuilder { + cb.NodeController.kubeclientset = ks + return cb +} + +// withOpenEBSClient fills openebs client to controller object. +func (cb *NodeControllerBuilder) withOpenEBSClient(cs clientset.Interface) *NodeControllerBuilder { + cb.NodeController.clientset = cs + return cb +} + +// withNodeLister fills Node lister to controller object. +func (cb *NodeControllerBuilder) withNodeLister(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Local().V1alpha1().LVMNodes() + cb.NodeController.NodeLister = NodeInformer.Lister() + return cb +} + +// withNodeSynced adds object sync information in cache to controller object. +func (cb *NodeControllerBuilder) withNodeSynced(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Local().V1alpha1().LVMNodes() + cb.NodeController.NodeSynced = NodeInformer.Informer().HasSynced + return cb +} + +// withWorkqueue adds workqueue to controller object. +func (cb *NodeControllerBuilder) withWorkqueueRateLimiting() *NodeControllerBuilder { + cb.NodeController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node") + return cb +} + +// withRecorder adds recorder to controller object. +func (cb *NodeControllerBuilder) withRecorder(ks kubernetes.Interface) *NodeControllerBuilder { + klog.Infof("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + cb.NodeController.recorder = recorder + return cb +} + +// withEventHandler adds event handlers controller object. +func (cb *NodeControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *NodeControllerBuilder { + cvcInformer := cvcInformerFactory.Local().V1alpha1().LVMNodes() + // Set up an event handler for when lvm node vg change. + // Note: rather than setting up the resync period at informer level, + // we are controlling the syncing based on pollInternal. See + // NodeController#Run func for more details. + cvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: cb.NodeController.addNode, + UpdateFunc: cb.NodeController.updateNode, + DeleteFunc: cb.NodeController.deleteNode, + }, 0) + return cb +} + +func (cb *NodeControllerBuilder) withPollInterval(interval time.Duration) *NodeControllerBuilder { + cb.NodeController.pollInterval = interval + return cb +} + +func (cb *NodeControllerBuilder) withOwnerReference(ownerRef metav1.OwnerReference) *NodeControllerBuilder { + cb.NodeController.ownerRef = ownerRef + return cb +} + +// Build returns a controller instance. +func (cb *NodeControllerBuilder) Build() (*NodeController, error) { + err := openebsScheme.AddToScheme(scheme.Scheme) + if err != nil { + return nil, err + } + return cb.NodeController, nil +} diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go new file mode 100644 index 00000000..3f936968 --- /dev/null +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -0,0 +1,307 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package lvmnode + +import ( + "fmt" + "reflect" + "time" + + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "github.com/openebs/lvm-localpv/pkg/builder/nodebuilder" + "github.com/openebs/lvm-localpv/pkg/equality" + "github.com/openebs/lvm-localpv/pkg/lvm" + k8serror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +func (c *NodeController) listLVMVolumeGroup() ([]apis.VolumeGroup, error) { + return lvm.ListLVMVolumeGroup() +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. +func (c *NodeController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + return c.syncNode(namespace, name) +} + +// synNode is the function which tries to converge to a desired state for the +// LVMNode +func (c *NodeController) syncNode(namespace string, name string) error { + // Get the node resource with this namespace/name + cachedNode, err := c.NodeLister.LVMNodes(namespace).Get(name) + if err != nil && !k8serror.IsNotFound(err) { + return err + } + var node *apis.LVMNode + if cachedNode != nil { + node = cachedNode.DeepCopy() + } + + vgs, err := c.listLVMVolumeGroup() + if err != nil { + return err + } + + if node == nil { // if it doesn't exists, create lvm node object + if node, err = nodebuilder.NewBuilder(). + WithNamespace(namespace).WithName(name). + WithVolumeGroups(vgs). + WithOwnerReferences(c.ownerRef). + Build(); err != nil { + return err + } + + klog.Infof("lvm node controller: creating new node object for %+v", node) + if node, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil { + return fmt.Errorf("create lvm node %s/%s: %v", namespace, name, err) + } + klog.Infof("lvm node controller: created node object %s/%s", namespace, name) + return nil + } + + // lvm node already exists check if we need to update it. + var updateRequired bool + // validate if owner reference updated. + if ownerRefs, req := c.isOwnerRefsUpdateRequired(node.OwnerReferences); req { + klog.Infof("lvm node controller: node owner references updated current=%+v, required=%+v", + node.OwnerReferences, ownerRefs) + node.OwnerReferences = ownerRefs + updateRequired = true + } + + // validate if node volume groups are upto date. + if !equality.Semantic.DeepEqual(node.VolumeGroups, vgs) { + klog.Infof("lvm node controller: node volume groups updated current=%+v, required=%+v", + node.VolumeGroups, vgs) + node.VolumeGroups = vgs + updateRequired = true + } + + if !updateRequired { + return nil + } + + klog.Infof("lvm node controller: updating node object with %+v", node) + if node, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Update(node); err != nil { + return fmt.Errorf("update lvm node %s/%s: %v", namespace, name, err) + } + klog.Infof("lvm node controller: updated node object %s/%s", namespace, name) + + return nil +} + +// addNode is the add event handler for LVMNode +func (c *NodeController) addNode(obj interface{}) { + node, ok := obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", obj)) + return + } + + klog.Infof("Got add event for lvm node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// updateNode is the update event handler for LVMNode +func (c *NodeController) updateNode(oldObj, newObj interface{}) { + newNode, ok := newObj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", newNode)) + return + } + + klog.Infof("Got update event for lvm node %s/%s", newNode.Namespace, newNode.Name) + c.enqueueNode(newNode) +} + +// deleteNode is the delete event handler for LVMNode +func (c *NodeController) deleteNode(obj interface{}) { + node, ok := obj.(*apis.LVMNode) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + node, ok = tombstone.Obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a LVMNode %#v", obj)) + return + } + } + + klog.Infof("Got delete event for node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// enqueueNode takes a LVMNode resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than LVMNode. +func (c *NodeController) enqueueNode(node *apis.LVMNode) { + // node must exists in openebs namespace & must equal to the node id. + if node.Namespace != lvm.LvmNamespace || + node.Name != lvm.NodeID { + klog.Warningf("skipping lvm node object %s/%s", node.Namespace, node.Name) + return + } + + key, err := cache.MetaNamespaceKeyFunc(node) + if err != nil { + runtime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *NodeController) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.Info("Starting Node controller") + + // Wait for the k8s caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.NodeSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + klog.Info("Starting Node workers") + // Launch worker to process Node resources + // Threadiness will decide the number of workers you want to launch to process work items from queue + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Info("Started Node workers") + + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + case <-stopCh: + klog.Info("Shutting down Node controller") + return nil + } + item := lvm.LvmNamespace + "/" + lvm.NodeID + c.workqueue.Add(item) // add the item to worker queue. + timer.Reset(c.pollInterval) + } +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *NodeController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *NodeController) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Node resource to be synced. + if err := c.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// isOwnerRefUpdateRequired validates if relevant owner references is being +// set for lvm node. If not, it returns the final owner references that needs +// to be set. +func (c *NodeController) isOwnerRefsUpdateRequired(ownerRefs []metav1.OwnerReference) ([]metav1.OwnerReference, bool) { + updated := false + reqOwnerRef := c.ownerRef + for idx := range ownerRefs { + if ownerRefs[idx].UID != reqOwnerRef.UID { + continue + } + // in case owner reference exists, validate + // if controller field is set correctly or not. + if !reflect.DeepEqual(ownerRefs[idx].Controller, reqOwnerRef.Controller) { + updated = true + ownerRefs[idx].Controller = reqOwnerRef.Controller + } + return ownerRefs, updated + } + updated = true + ownerRefs = append(ownerRefs, reqOwnerRef) + return ownerRefs, updated +} diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go new file mode 100644 index 00000000..cd5ee204 --- /dev/null +++ b/pkg/mgmt/lvmnode/start.go @@ -0,0 +1,108 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package lvmnode + +import ( + "sync" + "time" + + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + "github.com/openebs/lvm-localpv/pkg/lvm" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" +) + +// Start starts the lvmnode controller. +func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { + + // Get in cluster config + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrap(err, "error building kubeconfig") + } + + // Building Kubernetes Clientset + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building kubernetes clientset") + } + + // Building OpenEBS Clientset + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building openebs clientset") + } + + // setup watch only on node we are interested in. + nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( + openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() + })) + + k8sNode, err := kubeClient.CoreV1().Nodes().Get(lvm.NodeID, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "fetch k8s node %s", lvm.NodeID) + } + isTrue := true + // as object returned by client go clears all TypeMeta from it. + nodeGVK := &schema.GroupVersionKind{ + Group: "", Version: "v1", Kind: "Node", + } + ownerRef := metav1.OwnerReference{ + APIVersion: nodeGVK.GroupVersion().String(), + Kind: nodeGVK.Kind, + Name: k8sNode.Name, + UID: k8sNode.GetUID(), + Controller: &isTrue, + } + + // Build() fn of all controllers calls AddToScheme to adds all types of this + // clientset into the given scheme. + // If multiple controllers happen to call this AddToScheme same time, + // it causes panic with error saying concurrent map access. + // This lock is used to serialize the AddToScheme call of all controllers. + controllerMtx.Lock() + + controller, err := NewNodeControllerBuilder(). + withKubeClient(kubeClient). + withOpenEBSClient(openebsClient). + withNodeSynced(nodeInformerFactory). + withNodeLister(nodeInformerFactory). + withRecorder(kubeClient). + withEventHandler(nodeInformerFactory). + withPollInterval(60 * time.Second). + withOwnerReference(ownerRef). + withWorkqueueRateLimiting().Build() + + // blocking call, can't use defer to release the lock + controllerMtx.Unlock() + + if err != nil { + return errors.Wrapf(err, "error building controller instance") + } + + nodeInformerFactory.Start(stopCh) + + // Threadiness defines the number of workers to be launched in Run function + return controller.Run(1, stopCh) +}