Skip to content

Commit

Permalink
Merge pull request #82 from gabemontero/vol-watch-pruner
Browse files Browse the repository at this point in the history
add pruning ticker for stale volumes on disk, unneeded secret/configmap informers
  • Loading branch information
openshift-merge-robot committed Dec 13, 2021
2 parents f96ba31 + e6500f7 commit 3943b72
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 17 deletions.
37 changes: 29 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"

sharev1clientset "github.com/openshift/client-go/sharedresource/clientset/versioned"
"github.com/openshift/csi-driver-shared-resource/pkg/cache"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
"github.com/openshift/csi-driver-shared-resource/pkg/config"
"github.com/openshift/csi-driver-shared-resource/pkg/controller"
Expand Down Expand Up @@ -82,9 +83,34 @@ var rootCmd = &cobra.Command{
os.Exit(1)
}

go runOperator(cfg)
c, err := controller.NewController(cfg.GetShareRelistInterval(), cfg.RefreshResources)
if err != nil {
fmt.Printf("Failed to set up controller: %s", err.Error())
os.Exit(1)
}
prunerTicker := time.NewTicker(cfg.GetShareRelistInterval())
prunerDone := make(chan struct{})
go func() {
for {
select {
case <-prunerDone:
return
case <-prunerTicker.C:
// remove any orphaned volume files on disk
driver.Prune(client.GetClient())
if cfg.RefreshResources {
// in case we missed delete events, clean up unneeded secret/configmap informers
c.PruneSecretInformers(cache.NamespacesWithSharedSecrets())
c.PruneConfigMapInformers(cache.NamespacesWithSharedConfigMaps())
}
}
}
}()

go runOperator(c, cfg)
go watchForConfigChanges(cfgManager)
driver.Run()
prunerDone <- struct{}{}
},
}

Expand Down Expand Up @@ -127,14 +153,9 @@ func loadSharedresourceClientset() (sharev1clientset.Interface, error) {

// runOperator based on the informed configuration, it will spawn and run the Controller, until
// trapping OS signals.
func runOperator(cfg *config.Config) {
c, err := controller.NewController(cfg.GetShareRelistInterval(), cfg.RefreshResources)
if err != nil {
fmt.Printf("Failed to set up controller: %s", err.Error())
os.Exit(1)
}
func runOperator(c *controller.Controller, cfg *config.Config) {
stopCh := setupSignalHandler()
err = c.Run(stopCh)
err := c.Run(stopCh)
if err != nil {
fmt.Printf("Controller exited: %s", err.Error())
os.Exit(1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *Controller) PruneSecretInformers(namespaces map[string]struct{}) {
c.secretWatchObjs.Range(func(key, value interface{}) bool {
ns := key.(string)
if _, ok := namespaces[ns]; !ok {
klog.V(2).Infof("unregistering secret informer for namespace %s", ns)
c.UnregisterSecretInformer(ns)
}
return true
Expand Down Expand Up @@ -206,6 +207,7 @@ func (c *Controller) PruneConfigMapInformers(namespaces map[string]struct{}) {
c.configMapWatchObjs.Range(func(key, value interface{}) bool {
ns := key.(string)
if _, ok := namespaces[ns]; !ok {
klog.V(2).Infof("unregistering configmap informer for namespace %s", ns)
c.UnregisterConfigMapInformer(ns)
}
return true
Expand Down
85 changes: 82 additions & 3 deletions pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ limitations under the License.
package hostpath

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"k8s.io/utils/mount"
"os"
"path/filepath"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/api/sharedresource/v1alpha1"
Expand All @@ -48,7 +53,8 @@ type hostPath struct {
ids *identityServer
ns *nodeServer

root string
root string
volMapRoot string
}

var (
Expand Down Expand Up @@ -97,6 +103,8 @@ type HostPathDriver interface {
mapVolumeToPod(hpv *hostPathVolume) error
Run()
GetRoot() string
GetVolMapRoot() string
Prune(kubeClient kubernetes.Interface)
}

// NewHostPathDriver instantiate the HostPathDriver with the driver details. Optionally, a
Expand Down Expand Up @@ -140,6 +148,7 @@ func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, ma
endpoint: endpoint,
maxVolumesPerNode: maxVolumesPerNode,
root: root,
volMapRoot: volMapRoot,
}

if err := hp.loadVolsFromDisk(); err != nil {
Expand All @@ -153,6 +162,10 @@ func (hp *hostPath) GetRoot() string {
return hp.root
}

func (hp *hostPath) GetVolMapRoot() string {
return hp.volMapRoot
}

func (hp *hostPath) Run() {
// Create GRPC servers
hp.ids = NewIdentityServer(hp.name, hp.version)
Expand Down Expand Up @@ -775,7 +788,7 @@ func (hp *hostPath) deleteHostpathVolume(volID string) error {
func (hp *hostPath) loadVolsFromDisk() error {
klog.V(2).Infof("loadVolsFromDisk")
defer klog.V(2).Infof("loadVolsFromDisk exit")
return filepath.Walk(VolumeMapRoot, func(path string, info os.FileInfo, err error) error {
return filepath.Walk(hp.volMapRoot, func(path string, info os.FileInfo, err error) error {
if info == nil {
return nil
}
Expand All @@ -786,7 +799,7 @@ func (hp *hostPath) loadVolsFromDisk() error {
if info.IsDir() {
return nil
}
fileName := filepath.Join(VolumeMapRoot, info.Name())
fileName := filepath.Join(hp.volMapRoot, info.Name())
dataFile, oerr := os.Open(fileName)
if oerr != nil {
klog.V(0).Infof("loadVolsFromDisk error opening file %s: %s", fileName, err.Error())
Expand Down Expand Up @@ -818,3 +831,69 @@ func (hp *hostPath) loadVolsFromDisk() error {
return nil
})
}

// Prune inspects all the volumes stored on disk and checks if their associated pods still exists. If not, the volume
// file in question is deleted from disk.
func (hp *hostPath) Prune(kubeClient kubernetes.Interface) {
filesToPrune := map[string]hostPathVolume{}
filepath.Walk(hp.volMapRoot, func(path string, info os.FileInfo, err error) error {
if info == nil {
return nil
}
if err != nil {
// continue to next file
klog.V(5).Infof("Prune: for path %s given error %s", path, err.Error())
return nil
}
if info.IsDir() {
return nil
}
fileName := filepath.Join(hp.volMapRoot, info.Name())
dataFile, oerr := os.Open(fileName)
if oerr != nil {
klog.V(0).Infof("loadVolsFromDisk error opening file %s: %s", fileName, err.Error())
// continue to next file
return nil
}
dataDecoder := json.NewDecoder(dataFile)
hpv := &hostPathVolume{}
err = dataDecoder.Decode(hpv)
if err != nil {
klog.V(0).Infof("loadVolsFromDisk error decoding file %s: %s", fileName, err.Error())
// continue to next file
return nil
}
if hpv == nil {
klog.V(0).Infof("loadVolsFromDisk nil but no error for file %s", fileName)
// continue to next file
return nil
}
hpv.Lock = &sync.Mutex{}
_, err = kubeClient.CoreV1().Pods(hpv.GetPodNamespace()).Get(context.TODO(), hpv.GetPodName(), metav1.GetOptions{})
if err != nil && kerrors.IsNotFound(err) {
klog.V(2).Infof("pruner: hpv %q: %s", fileName, err.Error())
filesToPrune[fileName] = *hpv
}
return nil
})
if len(filesToPrune) == 0 {
return
}
mounter := mount.New("")
// a bit paranoid, but not deleting files in the walk loop in case that can mess up filepath.Walk's iteration logic
for file, hpv := range filesToPrune {
err := os.Remove(file)
if err != nil {
klog.Warningf("pruner: unable to prune file %q: %s", file, err.Error())
continue
}
klog.V(2).Infof("pruner: removed volume file %q with missing pod from disk", file)
err = mounter.Unmount(hpv.GetVolPathAnchorDir())
if err != nil {
klog.V(2).Infof("pruner: issue unmounting for volume %s mount id %s: %s", hpv.GetVolID(), hpv.GetVolPathAnchorDir(), err.Error())
} else {
klog.V(2).Infof("pruner: successfully unmounted volume %s mount id %s", hpv.GetVolID(), hpv.GetVolPathAnchorDir())
}
}

}
29 changes: 29 additions & 0 deletions pkg/hostpath/hostpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package hostpath

import (
"fmt"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -407,6 +409,33 @@ func TestDeleteReAddShare(t *testing.T) {
}
}

func TestPruner(t *testing.T) {
hp, dir1, dir2, err := testHostPathDriver(t.Name(), nil)
if err != nil {
t.Fatalf("%s", err.Error())
}
defer os.RemoveAll(dir1)
defer os.RemoveAll(dir2)
targetPath, err := ioutil.TempDir(os.TempDir(), t.Name())
if err != nil {
t.Fatalf("err on targetPath %s", err.Error())
}
defer os.RemoveAll(targetPath)

hpv := &hostPathVolume{VolID: "vol", PodNamespace: "ns", PodName: "pod", Lock: &sync.Mutex{}}
hpv.StoreToDisk(hp.GetVolMapRoot())
k8sClient := fakekubeclientset.NewSimpleClientset()
client.SetClient(k8sClient)
hp.Prune(k8sClient)
prunedFile := filepath.Join(hp.GetVolMapRoot(), "vol")
filepath.Walk(hp.GetVolMapRoot(), func(path string, info fs.FileInfo, err error) error {
if path == prunedFile {
t.Fatalf("file %q was not pruned", path)
}
return nil
})
}

func TestUpdateShare(t *testing.T) {
readOnly := []bool{true, false}
for _, ro := range readOnly {
Expand Down
6 changes: 3 additions & 3 deletions pkg/hostpath/hpv.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,20 @@ func (hpv *hostPathVolume) SetRefresh(refresh bool) {
hpv.Refresh = refresh
}

func (hpv *hostPathVolume) StoreToDisk() error {
func (hpv *hostPathVolume) StoreToDisk(volMapRoot string) error {
hpv.Lock.Lock()
defer hpv.Lock.Unlock()
klog.V(4).Infof("storeVolToDisk %s", hpv.VolID)
defer klog.V(4).Infof("storeVolToDisk exit %s", hpv.VolID)

f, terr := os.Open(VolumeMapRoot)
f, terr := os.Open(volMapRoot)
if terr != nil {
// catch for unit tests
return nil
}
defer f.Close()

filePath := filepath.Join(VolumeMapRoot, hpv.VolID)
filePath := filepath.Join(volMapRoot, hpv.VolID)
dataFile, err := os.Create(filePath)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/hostpath/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
err.Error()))
}

if err := vol.StoreToDisk(); err != nil {
if err := vol.StoreToDisk(ns.hp.GetVolMapRoot()); err != nil {
metrics.IncMountCounters(false)
klog.Errorf("failed to persist driver volume metadata to disk: %s", err.Error())
return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -314,7 +314,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err))
}

filePath := filepath.Join(VolumeMapRoot, hpv.GetVolID())
filePath := filepath.Join(ns.hp.GetVolMapRoot(), hpv.GetVolID())
if err := os.Remove(filePath); err != nil {
klog.Errorf("failed to persist driver volume metadata to disk: %s", err.Error())
return nil, status.Error(codes.Internal, err.Error())
Expand Down
23 changes: 22 additions & 1 deletion test/framework/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func CreateTestPod(t *TestArgs) {
Containers: []corev1.Container{
{
Name: containerName,
Image: "registry.access.redhat.com/ubi8/ubi",
Image: "registry.redhat.io/ubi8/ubi",
Command: []string{"sleep", "1000000"},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -96,6 +96,27 @@ func CreateTestPod(t *TestArgs) {
t.T.Logf("%s: end create test pod %s", time.Now().String(), t.Name)

if t.TestPodUp {
eventClient := kubeClient.CoreV1().Events(t.Name)
t.T.Logf("%s: start verify image for test pod %s is pulled", time.Now().String(), t.Name)
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (done bool, err error) {
eventList, err := eventClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.T.Logf("%s: error list events for %s: %s", time.Now().String(), t.Name, err.Error())
return false, nil
}
for _, event := range eventList.Items {
if event.InvolvedObject.Kind == "Pod" &&
(strings.Contains(event.Reason, "Pulled") || strings.Contains(event.Reason, "Already")) {
t.T.Logf("%s: image for test pod %s available: %s", time.Now().String(), t.Name, event.Reason)
return true, nil
}
}
return false, nil
})
// even if we do not pull the image int time, we'll still given it time to sort out as part of the Pod getting to Running state
if err != nil {
t.T.Logf("%s: did not see image pull event for pod %s but will still see if it comes up in time", time.Now().String(), t.Name)
}
t.T.Logf("%s: start verify test pod %s is up", time.Now().String(), t.Name)
err = wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
pod, err = podClient.Get(context.TODO(), t.Name, metav1.GetOptions{})
Expand Down

0 comments on commit 3943b72

Please sign in to comment.