Skip to content

Commit

Permalink
controller/discoverer: add readiness check
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienBalestra committed Aug 13, 2019
1 parent 747ad24 commit bbbf190
Show file tree
Hide file tree
Showing 12 changed files with 771 additions and 15 deletions.
11 changes: 11 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions cmd/local-volume-provisioner/main.go
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"time"

"github.com/heptiolabs/healthcheck"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
Expand All @@ -43,13 +44,15 @@ const maxGetNodesRetries = 3
var (
optListenAddress string
optMetricsPath string
optReadinessPath string
)

func main() {
rand.Seed(time.Now().UTC().UnixNano())
klog.InitFlags(nil)
flag.StringVar(&optListenAddress, "listen-address", ":8080", "address on which to expose metrics")
flag.StringVar(&optListenAddress, "listen-address", ":8080", "address on which to expose metrics and readiness status")
flag.StringVar(&optMetricsPath, "metrics-path", "/metrics", "path under which to expose metrics")
flag.StringVar(&optReadinessPath, "readiness-path", "/ready", "path under which to expose readiness status")
flag.Parse()
flag.Set("logtostderr", "true")

Expand Down Expand Up @@ -82,6 +85,9 @@ func main() {
client := common.SetupClient()
node := getNode(client, nodeName)

health := healthcheck.NewHandler()
http.HandleFunc(optReadinessPath, health.ReadyEndpoint)

klog.Info("Starting controller\n")
procTable := deleter.NewProcTable()
go controller.StartLocalController(client, procTable, &common.UserConfig{
Expand All @@ -96,7 +102,7 @@ func main() {
JobContainerImage: jobImage,
LabelsForPV: provisionerConfig.LabelsForPV,
SetPVOwnerRef: provisionerConfig.SetPVOwnerRef,
})
}, health)

klog.Infof("Starting metrics server at %s\n", optListenAddress)
prometheus.MustRegister([]prometheus.Collector{
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/controller.go
Expand Up @@ -23,6 +23,8 @@ import (

"k8s.io/klog"

"github.com/heptiolabs/healthcheck"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/cache"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/deleter"
Expand All @@ -41,7 +43,7 @@ import (
)

// StartLocalController starts the sync loop for the local PV discovery and deleter
func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable, config *common.UserConfig) {
func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable, config *common.UserConfig, health healthcheck.Handler) {
klog.Info("Initializing volume cache\n")

var provisionerName string
Expand Down Expand Up @@ -90,6 +92,7 @@ func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable
if err != nil {
klog.Fatalf("Error initializing discoverer: %v", err)
}
health.AddReadinessCheck("discoveredReadyCheck", discoverer.ReadyCheck)

deleter := deleter.NewDeleter(runtimeConfig, cleanupTracker)

Expand Down
63 changes: 51 additions & 12 deletions pkg/discovery/discovery.go
Expand Up @@ -18,9 +18,11 @@ package discovery

import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"path/filepath"
"sync"
"time"

"k8s.io/klog"
Expand All @@ -47,6 +49,9 @@ type Discoverer struct {
nodeAffinity *v1.VolumeNodeAffinity
classLister storagev1listers.StorageClassLister
ownerReference *metav1.OwnerReference

ready bool
readySync sync.RWMutex
}

// NewDiscoverer creates a Discoverer object that will scan through
Expand Down Expand Up @@ -184,9 +189,26 @@ func generateVolumeNodeAffinity(node *v1.Node) (*v1.VolumeNodeAffinity, error) {

// DiscoverLocalVolumes reads the configured discovery paths, and creates PVs for the new volumes
func (d *Discoverer) DiscoverLocalVolumes() {
var discoErrors []error
for class, config := range d.DiscoveryMap {
d.discoverVolumesAtPath(class, config)
err := d.discoverVolumesAtPath(class, config)
if err != nil {
discoErrors = append(discoErrors, err)
}
}
d.readySync.Lock()
d.ready = discoErrors == nil
d.readySync.Unlock()
}

// ReadyCheck returns an error if the discovery state is not ready
func (d *Discoverer) ReadyCheck() error {
d.readySync.RLock()
defer d.readySync.RUnlock()
if d.ready {
return nil
}
return errors.New("discovererNotReady")
}

func (d *Discoverer) getReclaimPolicyFromStorageClass(name string) (v1.PersistentVolumeReclaimPolicy, error) {
Expand All @@ -209,31 +231,31 @@ func (d *Discoverer) getMountOptionsFromStorageClass(name string) ([]string, err
return class.MountOptions, nil
}

func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConfig) {
func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConfig) error {
klog.V(7).Infof("Discovering volumes at hostpath %q, mount path %q for storage class %q", config.HostDir, config.MountDir, class)

reclaimPolicy, err := d.getReclaimPolicyFromStorageClass(class)
if err != nil {
klog.Errorf("Failed to get ReclaimPolicy from storage class %q: %v", class, err)
return
return err
}

if reclaimPolicy != v1.PersistentVolumeReclaimRetain && reclaimPolicy != v1.PersistentVolumeReclaimDelete {
klog.Errorf("Unsupported ReclaimPolicy %q from storage class %q, supported policy are Retain and Delete.", reclaimPolicy, class)
return
return err
}

files, err := d.VolUtil.ReadDir(config.MountDir)
if err != nil {
klog.Errorf("Error reading directory: %v", err)
return
return err
}

// Retrieve list of mount points to iterate through discovered paths (aka files) below
mountPoints, err := d.RuntimeConfig.Mounter.List()
if err != nil {
klog.Errorf("Error retreiving mountpoints: %v", err)
return
return err
}
// Put mount points into set for faster checks below
type empty struct{}
Expand All @@ -242,12 +264,14 @@ func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConf
mountPointMap[mp.Path] = empty{}
}

var discoErrors []error
for _, file := range files {
startTime := time.Now()
filePath := filepath.Join(config.MountDir, file)
volMode, err := common.GetVolumeMode(d.VolUtil, filePath)
if err != nil {
klog.Error(err)
discoErrors = append(discoErrors, err)
continue
}
// Check if PV already exists for it
Expand All @@ -256,9 +280,10 @@ func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConf
if exists {
if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == v1.PersistentVolumeBlock &&
volMode == v1.PersistentVolumeFilesystem {
errStr := fmt.Sprintf("Incorrect Volume Mode: PV %q requires block mode but path %q was in fs mode.", pvName, filePath)
klog.Errorf(errStr)
d.Recorder.Eventf(pv, v1.EventTypeWarning, common.EventVolumeFailedDelete, errStr)
err := fmt.Errorf("incorrect Volume Mode: PV %q requires block mode but path %q was in fs mode", pvName, filePath)
klog.Error(err)
discoErrors = append(discoErrors, err)
d.Recorder.Eventf(pv, v1.EventTypeWarning, common.EventVolumeFailedDelete, err.Error())
}
continue
}
Expand All @@ -275,6 +300,7 @@ func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConf
mountOptions, err := d.getMountOptionsFromStorageClass(class)
if err != nil {
klog.Errorf("Failed to get mount options from storage class %s: %v", class, err)
discoErrors = append(discoErrors, err)
continue
}

Expand All @@ -285,6 +311,7 @@ func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConf
capacityByte, err = d.VolUtil.GetBlockCapacityByte(filePath)
if err != nil {
klog.Errorf("Path %q block stats error: %v", filePath, err)
discoErrors = append(discoErrors, err)
continue
}
if desireVolumeMode == v1.PersistentVolumeBlock && len(mountOptions) != 0 {
Expand All @@ -294,25 +321,36 @@ func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConf
case v1.PersistentVolumeFilesystem:
if desireVolumeMode == v1.PersistentVolumeBlock {
klog.Errorf("Path %q of filesystem mode cannot be used to create block volume", filePath)
discoErrors = append(discoErrors, err)
continue
}
// Validate that this path is an actual mountpoint
if _, isMntPnt := mountPointMap[filePath]; isMntPnt == false {
klog.Errorf("Path %q is not an actual mountpoint", filePath)
discoErrors = append(discoErrors, err)
continue
}
capacityByte, err = d.VolUtil.GetFsCapacityByte(filePath)
if err != nil {
klog.Errorf("Path %q fs stats error: %v", filePath, err)
discoErrors = append(discoErrors, err)
continue
}
default:
klog.Errorf("Path %q has unexpected volume type %q", filePath, volMode)
discoErrors = append(discoErrors, err)
continue
}

d.createPV(file, class, reclaimPolicy, mountOptions, config, capacityByte, desireVolumeMode, startTime)
err = d.createPV(file, class, reclaimPolicy, mountOptions, config, capacityByte, desireVolumeMode, startTime)
if err != nil {
discoErrors = append(discoErrors, err)
}
}
if discoErrors == nil {
return nil
}
return fmt.Errorf("%d errors while discovering volumes: %v", len(discoErrors), discoErrors)
}

func generatePVName(file, node, class string) string {
Expand All @@ -324,7 +362,7 @@ func generatePVName(file, node, class string) string {
return fmt.Sprintf("local-pv-%x", h.Sum32())
}

func (d *Discoverer) createPV(file, class string, reclaimPolicy v1.PersistentVolumeReclaimPolicy, mountOptions []string, config common.MountConfig, capacityByte int64, volMode v1.PersistentVolumeMode, startTime time.Time) {
func (d *Discoverer) createPV(file, class string, reclaimPolicy v1.PersistentVolumeReclaimPolicy, mountOptions []string, config common.MountConfig, capacityByte int64, volMode v1.PersistentVolumeMode, startTime time.Time) error {
pvName := generatePVName(file, d.Node.Name, class)
outsidePath := filepath.Join(config.HostDir, file)

Expand Down Expand Up @@ -361,12 +399,13 @@ func (d *Discoverer) createPV(file, class string, reclaimPolicy v1.PersistentVol
_, err := d.APIUtil.CreatePV(pvSpec)
if err != nil {
klog.Errorf("Error creating PV %q for volume at %q: %v", pvName, outsidePath, err)
return
return err
}
klog.Infof("Created PV %q for volume at %q", pvName, outsidePath)
mode := string(volMode)
metrics.PersistentVolumeDiscoveryTotal.WithLabelValues(mode).Inc()
metrics.PersistentVolumeDiscoveryDurationSeconds.WithLabelValues(mode).Observe(time.Since(startTime).Seconds())
return nil
}

// Round down the capacity to an easy to read value.
Expand Down

0 comments on commit bbbf190

Please sign in to comment.