Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make createEndpointService() and deleteEndpointService() plugin interface methods. #45528

Merged
merged 1 commit into from
May 16, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
272 changes: 162 additions & 110 deletions pkg/volume/glusterfs/glusterfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
// absGidMin <= defGidMin <= defGidMax <= absGidMax
absoluteGidMin = 2000
absoluteGidMax = math.MaxInt32
heketiAnn = "heketi-dynamic-provisioner"
glusterTypeAnn = "gluster.org/type"
glusterDescAnn = "Gluster: Dynamically provisioned PV"
)

func (plugin *glusterfsPlugin) Init(host volume.VolumeHost) error {
Expand Down Expand Up @@ -141,12 +144,57 @@ func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volu
if kubeClient == nil {
return nil, fmt.Errorf("glusterfs: failed to get kube client to initialize mounter")
}

ep, err := kubeClient.Core().Endpoints(ns).Get(epName, metav1.GetOptions{})
if err != nil {
glog.Errorf("glusterfs: failed to get endpoints %s[%v]", epName, err)
return nil, err
if err != nil && errors.IsNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit more paranoid but validate that epName is created using the same schema that provision uses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

claim := spec.PersistentVolume.Spec.ClaimRef.Name
checkEpName := dynamicEpSvcPrefix + claim
if epName != checkEpName {
return nil, fmt.Errorf("failed to get endpoint %s, error %v", epName, err)
}
glog.Errorf("glusterfs: failed to get endpoint %s[%v]", epName, err)
if spec != nil && spec.PersistentVolume.Annotations["kubernetes.io/createdby"] == heketiAnn {
class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodes cannot look up arbitrary secrets based on values in storage class parameters...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought only the create/delete, attach/detach phases were supposed to resolve storage class info... isn't mount only supposed to use info in the PV?

if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get storageclass, error: %v", err)
}

cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient())
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to parse parameters, error: %v", err)
}

scConfig := *cfg
cli := gcli.NewClient(scConfig.url, scConfig.user, scConfig.secretValue)
if cli == nil {
return nil, fmt.Errorf("glusterfs: failed to create heketi client, error: %v", err)
}

volumeID := dstrings.TrimPrefix(source.Path, volPrefix)
volInfo, err := cli.VolumeInfo(volumeID)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get volume info, error: %v", err)
}

endpointIPs, err := getClusterNodes(cli, volInfo.Cluster)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get cluster nodes, error: %v", err)
}

// Give an attempt to recreate endpoint/service.

_, _, err = plugin.createEndpointService(ns, epName, endpointIPs, claim)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodes do not have permission to create endpoints or services... this will fail on any cluster running RBAC


if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("glusterfs: failed to recreate endpoint/service, error: %v", err)
return nil, fmt.Errorf("failed to recreate endpoint/service, error: %v", err)
}
glog.V(3).Infof("glusterfs: endpoint/service [%v] successfully recreated ", epName)
} else {
return nil, err
}
}
glog.V(1).Infof("glusterfs: endpoints %v", ep)

return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(), exec.New())
}

Expand Down Expand Up @@ -567,6 +615,77 @@ func (p *glusterfsPlugin) getGidTable(className string, min int, max int) (*MinM
return newGidTable, nil
}

//createEndpointService create an endpoint and service in provided namespace.
func (plugin *glusterfsPlugin) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {

addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}

// deleteEndpointService delete the endpoint and service from the provided namespace.
func (plugin *glusterfsPlugin) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}

func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) {
gidStr, ok := d.spec.Annotations[volumehelper.VolumeGidAnnotationKey]

Expand Down Expand Up @@ -640,7 +759,7 @@ func (d *glusterfsVolumeDeleter) Delete() error {
dynamicEndpoint = pvSpec.Glusterfs.EndpointsName
}
glog.V(3).Infof("glusterfs: dynamic namespace and endpoint : [%v/%v]", dynamicNamespace, dynamicEndpoint)
err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
err = d.plugin.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
if err != nil {
glog.Errorf("glusterfs: error when deleting endpoint/service :%v", err)
} else {
Expand Down Expand Up @@ -685,7 +804,7 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
}

glog.Errorf("glusterfs: create volume err: %v.", err)
return nil, fmt.Errorf("glusterfs: create volume err: %v.", err)
return nil, fmt.Errorf("glusterfs: create volume err: %v", err)
}
pv := new(v1.PersistentVolume)
pv.Spec.PersistentVolumeSource.Glusterfs = glusterfs
Expand All @@ -699,9 +818,9 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {

pv.Annotations = map[string]string{
volumehelper.VolumeGidAnnotationKey: gidStr,
"kubernetes.io/createdby": "heketi-dynamic-provisioner",
"gluster.org/type": "file",
"Description": "Gluster: Dynamically provisioned PV",
"kubernetes.io/createdby": heketiAnn,
glusterTypeAnn: "file",
"Description": glusterDescAnn,
}

pv.Spec.Capacity = v1.ResourceList{
Expand All @@ -710,33 +829,6 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
return pv, nil
}

func (p *glusterfsVolumeProvisioner) GetClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}

// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}

func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolumeSource, size int, err error) {
var clusterIDs []string
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
Expand Down Expand Up @@ -764,7 +856,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
return nil, 0, fmt.Errorf("error creating volume %v", err)
}
glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name)
dynamicHostIps, err := p.GetClusterNodes(cli, volume.Cluster)
dynamicHostIps, err := getClusterNodes(cli, volume.Cluster)
if err != nil {
glog.Errorf("glusterfs: error [%v] when getting cluster nodes for volume %s", err, volume)
return nil, 0, fmt.Errorf("error [%v] when getting cluster nodes for volume %s", err, volume)
Expand All @@ -776,7 +868,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
// of volume creation.
epServiceName := dynamicEpSvcPrefix + p.options.PVC.Name
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
endpoint, service, err := p.plugin.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint/service: %v", err)
deleteErr := cli.VolumeDelete(volume.Id)
Expand All @@ -793,75 +885,6 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
}, sz, nil
}

func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {

addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}

func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := d.plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}

// parseSecret finds a given Secret instance and reads user password from it.
func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient)
Expand Down Expand Up @@ -957,7 +980,7 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
if len(parseVolumeTypeInfo) >= 2 {
newReplicaCount, err := convertVolumeParam(parseVolumeTypeInfo[1])
if err != nil {
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", err, parseVolumeTypeInfo[1], "volumetype", glusterfsPluginName)
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", err, parseVolumeTypeInfo[1], "volumetype", glusterfsPluginName)
}
cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: newReplicaCount}}
} else {
Expand All @@ -967,11 +990,11 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
if len(parseVolumeTypeInfo) >= 3 {
newDisperseData, err := convertVolumeParam(parseVolumeTypeInfo[1])
if err != nil {
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", parseVolumeTypeInfo[1], err, "volumetype", glusterfsPluginName)
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", parseVolumeTypeInfo[1], err, "volumetype", glusterfsPluginName)
}
newDisperseRedundancy, err := convertVolumeParam(parseVolumeTypeInfo[2])
if err != nil {
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", err, parseVolumeTypeInfo[2], "volumetype", glusterfsPluginName)
return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", err, parseVolumeTypeInfo[2], "volumetype", glusterfsPluginName)
}
cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityEC, Disperse: gapi.DisperseDurability{Data: newDisperseData, Redundancy: newDisperseRedundancy}}
} else {
Expand Down Expand Up @@ -1011,3 +1034,32 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa

return &cfg, nil
}

// getClusterNodes() returns the cluster nodes of a given cluster

func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}

// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}