Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manually revert #45528 #47726

Merged
merged 1 commit into from
Jun 20, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
268 changes: 108 additions & 160 deletions pkg/volume/glusterfs/glusterfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ const (
// absGidMin <= defGidMin <= defGidMax <= absGidMax
absoluteGidMin = 2000
absoluteGidMax = math.MaxInt32
heketiAnn = "heketi-dynamic-provisioner"
glusterTypeAnn = "gluster.org/type"
glusterDescAnn = "Gluster: Dynamically provisioned PV"
linuxGlusterMountBinary = "mount.glusterfs"
autoUnmountBinaryVer = "3.11"
)
Expand Down Expand Up @@ -146,57 +143,12 @@ func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volu
if kubeClient == nil {
return nil, fmt.Errorf("glusterfs: failed to get kube client to initialize mounter")
}

ep, err := kubeClient.Core().Endpoints(ns).Get(epName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
claim := spec.PersistentVolume.Spec.ClaimRef.Name
checkEpName := dynamicEpSvcPrefix + claim
if epName != checkEpName {
return nil, fmt.Errorf("failed to get endpoint %s, error %v", epName, err)
}
glog.Errorf("glusterfs: failed to get endpoint %s[%v]", epName, err)
if spec != nil && spec.PersistentVolume.Annotations[volumehelper.VolumeDynamicallyCreatedByKey] == heketiAnn {
class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get storageclass, error: %v", err)
}

cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient())
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to parse parameters, error: %v", err)
}

scConfig := *cfg
cli := gcli.NewClient(scConfig.url, scConfig.user, scConfig.secretValue)
if cli == nil {
return nil, fmt.Errorf("glusterfs: failed to create heketi client, error: %v", err)
}

volumeID := dstrings.TrimPrefix(source.Path, volPrefix)
volInfo, err := cli.VolumeInfo(volumeID)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get volume info, error: %v", err)
}

endpointIPs, err := getClusterNodes(cli, volInfo.Cluster)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get cluster nodes, error: %v", err)
}

// Give an attempt to recreate endpoint/service.

_, _, err = plugin.createEndpointService(ns, epName, endpointIPs, claim)

if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("glusterfs: failed to recreate endpoint/service, error: %v", err)
return nil, fmt.Errorf("failed to recreate endpoint/service, error: %v", err)
}
glog.V(3).Infof("glusterfs: endpoint/service [%v] successfully recreated ", epName)
} else {
return nil, err
}
if err != nil {
glog.Errorf("glusterfs: failed to get endpoints %s[%v]", epName, err)
return nil, err
}

glog.V(1).Infof("glusterfs: endpoints %v", ep)
return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(), exec.New())
}

Expand Down Expand Up @@ -634,77 +586,6 @@ func (plugin *glusterfsPlugin) getGidTable(className string, min int, max int) (
return newGidTable, nil
}

//createEndpointService create an endpoint and service in provided namespace.
func (plugin *glusterfsPlugin) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {

addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}

// deleteEndpointService delete the endpoint and service from the provided namespace.
func (plugin *glusterfsPlugin) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}

func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) {
gidStr, ok := d.spec.Annotations[volumehelper.VolumeGidAnnotationKey]

Expand Down Expand Up @@ -778,7 +659,7 @@ func (d *glusterfsVolumeDeleter) Delete() error {
dynamicEndpoint = pvSpec.Glusterfs.EndpointsName
}
glog.V(3).Infof("glusterfs: dynamic namespace and endpoint : [%v/%v]", dynamicNamespace, dynamicEndpoint)
err = d.plugin.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
if err != nil {
glog.Errorf("glusterfs: error when deleting endpoint/service :%v", err)
} else {
Expand Down Expand Up @@ -840,11 +721,11 @@ func (p *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
gidStr := strconv.FormatInt(int64(gid), 10)

pv.Annotations = map[string]string{
volumehelper.VolumeGidAnnotationKey: gidStr,
volumehelper.VolumeDynamicallyCreatedByKey: heketiAnn,
glusterTypeAnn: "file",
"Description": glusterDescAnn,
v1.MountOptionAnnotation: "auto_unmount",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to drop v1.MountOptionAnnotation: "auto_unmount", ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, looks like this dropped rootfs@04bf95a#diff-e97253dd603331ffca81131a4b67264fR869 accidentally

Copy link
Contributor

@humblec humblec Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liggitt good catch.. I failed to notice it, @rootfs we should not drop that mount annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

volumehelper.VolumeGidAnnotationKey: gidStr,
"kubernetes.io/createdby": "heketi-dynamic-provisioner",
"gluster.org/type": "file",
"Description": "Gluster: Dynamically provisioned PV",
v1.MountOptionAnnotation: "auto_unmount",
}

pv.Spec.Capacity = v1.ResourceList{
Expand All @@ -853,6 +734,33 @@ func (p *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
return pv, nil
}

func (p *glusterfsVolumeProvisioner) GetClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}

// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}

func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolumeSource, size int, err error) {
var clusterIDs []string
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
Expand Down Expand Up @@ -880,7 +788,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
return nil, 0, fmt.Errorf("error creating volume %v", err)
}
glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name)
dynamicHostIps, err := getClusterNodes(cli, volume.Cluster)
dynamicHostIps, err := p.GetClusterNodes(cli, volume.Cluster)
if err != nil {
glog.Errorf("glusterfs: error [%v] when getting cluster nodes for volume %s", err, volume)
return nil, 0, fmt.Errorf("error [%v] when getting cluster nodes for volume %s", err, volume)
Expand All @@ -892,7 +800,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
// of volume creation.
epServiceName := dynamicEpSvcPrefix + p.options.PVC.Name
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.plugin.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint/service: %v", err)
deleteErr := cli.VolumeDelete(volume.Id)
Expand All @@ -909,6 +817,75 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
}, sz, nil
}

func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {

addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}

func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := d.plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}

// parseSecret finds a given Secret instance and reads user password from it.
func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient)
Expand Down Expand Up @@ -1058,32 +1035,3 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa

return &cfg, nil
}

// getClusterNodes() returns the cluster nodes of a given cluster

func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}

// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}