Skip to content

Commit

Permalink
feat: Implement distributed snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nearora-msft committed Dec 22, 2021
1 parent 138d310 commit 73543dc
Show file tree
Hide file tree
Showing 16 changed files with 855 additions and 15 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh

* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second.

*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
* `--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.

* `--enable-distributed-snapshotting` : Enables each node to handle snapshots for the volumes local to that node. Off by default. It should be set to true only if `--node-deployment` parameter for the csi external snapshotter sidecar is set to true.

#### Other recognized arguments
* `--kubeconfig <path>`: Path to Kubernetes client configuration that the snapshot controller uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the snapshot controller does not run as a Kubernetes pod, e.g. for debugging.
Expand Down Expand Up @@ -172,9 +174,11 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh

* `--worker-threads`: Number of worker threads for running create snapshot and delete snapshot operations. Default value is 10.

* `--node-deployment`: Enables deploying the sidecar controller together with a CSI driver on nodes to manage node-local volumes. Off by default. This should be set to true along with the `--enable-distributed-snapshotting` in the snapshot controller parameters to make use of distributed snapshotting.

* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second.

*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
* `--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
#### Other recognized arguments
* `--kubeconfig <path>`: Path to Kubernetes client configuration that the CSI external-snapshotter uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-snapshotter does not run as a Kubernetes pod, e.g. for debugging.

Expand Down
37 changes: 31 additions & 6 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import (
"strings"
"time"

utils "github.com/kubernetes-csi/external-snapshotter/v4/pkg/utils"

"google.golang.org/grpc"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -75,11 +79,12 @@ var (
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.")
)

var (
Expand All @@ -98,6 +103,12 @@ func main() {
}
klog.Infof("Version: %s", version)

// If distributed snapshotting is enabled and leaderElection is also set to true, return
if *enableNodeDeployment && *leaderElection {
klog.Error("Leader election cannot happen when node-deployment is set to true")
os.Exit(1)
}

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
Expand All @@ -122,6 +133,19 @@ func main() {

factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
var snapshotContentfactory informers.SharedInformerFactory
if *enableNodeDeployment {
node := os.Getenv("NODE_NAME")
if node == "" {
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
}
snapshotContentfactory = informers.NewSharedInformerFactoryWithOptions(snapClient, *resyncPeriod, informers.WithTweakListOptions(func(lo *v1.ListOptions) {
lo.LabelSelector = labels.Set{utils.VolumeSnapshotContentManagedByLabel: node}.AsSelector().String()
}),
)
} else {
snapshotContentfactory = factory
}

// Add Snapshot types to the default Kubernetes so events can be logged for them
snapshotscheme.AddToScheme(scheme.Scheme)
Expand Down Expand Up @@ -202,7 +226,7 @@ func main() {
snapClient,
kubeClient,
driverName,
factory.Snapshot().V1().VolumeSnapshotContents(),
snapshotContentfactory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
snapShotter,
*csiTimeout,
Expand All @@ -216,6 +240,7 @@ func main() {
run := func(context.Context) {
// run...
stopCh := make(chan struct{})
snapshotContentfactory.Start(stopCh)
factory.Start(stopCh)
coreFactory.Start(stopCh)
go ctrl.Run(*threads, stopCh)
Expand Down
17 changes: 13 additions & 4 deletions cmd/snapshot-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"
"time"

v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -64,10 +65,11 @@ var (
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableDistributedSnapshotting = flag.Bool("enable-distributed-snapshotting", false, "Enables each node to handle snapshotting for the local volumes created on that node")
)

var (
Expand Down Expand Up @@ -147,6 +149,11 @@ func main() {

factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
var nodeInformer v1.NodeInformer

if *enableDistributedSnapshotting {
nodeInformer = coreFactory.Core().V1().Nodes()
}

// Create and register metrics manager
metricsManager := metrics.NewMetricsManager()
Expand Down Expand Up @@ -174,10 +181,12 @@ func main() {
factory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
nodeInformer,
metricsManager,
*resyncPeriod,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
*enableDistributedSnapshotting,
)

if err := ensureCustomResourceDefinitionsExist(snapClient); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ rules:
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["update", "patch"]

# Enable this RBAC rule only when using distributed snapshotting, i.e. when the enable-distributed-snapshotting flag is set to true
# - apiGroups: [""]
# resources: ["nodes"]
# verbs: ["get", "list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
k8s.io/component-base v0.23.0
k8s.io/component-helpers v0.23.0
k8s.io/klog/v2 v2.30.0
k8s.io/kubernetes v1.23.0
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ k8s.io/cluster-bootstrap v0.23.0/go.mod h1:VltEnKWfrRTiKgOXp3ts3vh7yqNlH6KFKFflo
k8s.io/code-generator v0.23.0/go.mod h1:vQvOhDXhuzqiVfM/YHp+dmg10WDZCchJVObc9MvowsE=
k8s.io/component-base v0.23.0 h1:UAnyzjvVZ2ZR1lF35YwtNY6VMN94WtOnArcXBu34es8=
k8s.io/component-base v0.23.0/go.mod h1:DHH5uiFvLC1edCpvcTDV++NKULdYYU6pR9Tt3HIKMKI=
k8s.io/component-helpers v0.23.0 h1:qNbqN10QTefiWcCOPkHL/0nn81sdKVv6ZgEXcSyot/U=
k8s.io/component-helpers v0.23.0/go.mod h1:liXMh6FZS4qamKtMJQ7uLHnFe3tlC86RX5mJEk/aerg=
k8s.io/controller-manager v0.23.0/go.mod h1:6/IKItSv6p9FY3mSbHgsOYmt4y+HDxiC5hEFg9rJVc8=
k8s.io/cri-api v0.23.0/go.mod h1:2edENu3/mkyW3c6fVPPPaVGEFbLRacJizBbSp7ZOLOo=
Expand Down
2 changes: 2 additions & 0 deletions pkg/common-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,10 +838,12 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
nil,
metricsManager,
60*time.Second,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
false,
)

ctrl.eventRecorder = record.NewFakeRecorder(1000)
Expand Down
37 changes: 37 additions & 0 deletions pkg/common-controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"

crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
Expand Down Expand Up @@ -671,6 +672,18 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
},
}

if ctrl.enableDistributedSnapshotting {
nodeName, err := ctrl.getManagedByNode(volume)
if err != nil {
return nil, err
}
if nodeName != "" {
snapshotContent.Labels = map[string]string{
utils.VolumeSnapshotContentManagedByLabel: nodeName,
}
}
}

// Set AnnDeletionSecretRefName and AnnDeletionSecretRefNamespace
if snapshotterSecretRef != nil {
klog.V(5).Infof("createSnapshotContent: set annotation [%s] on content [%s].", utils.AnnDeletionSecretRefName, snapshotContent.Name)
Expand Down Expand Up @@ -1655,3 +1668,27 @@ func (ctrl *csiSnapshotCommonController) checkAndSetInvalidSnapshotLabel(snapsho

return updatedSnapshot, nil
}

func (ctrl *csiSnapshotCommonController) getManagedByNode(pv *v1.PersistentVolume) (string, error) {
if pv.Spec.NodeAffinity == nil {
klog.V(5).Infof("NodeAffinity not set for pv %s", pv.Name)
return "", nil
}
nodeSelectorTerms := pv.Spec.NodeAffinity.Required

nodes, err := ctrl.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get the list of nodes: %q", err)
return "", err
}

for _, node := range nodes {
match, _ := corev1helpers.MatchNodeSelectorTerms(node, nodeSelectorTerms)
if match {
return node.Name, nil
}
}

klog.Errorf("failed to find nodes that match the node affinity requirements for pv[%s]", pv.Name)
return "", nil
}
20 changes: 19 additions & 1 deletion pkg/common-controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ type csiSnapshotCommonController struct {
classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced

snapshotStore cache.Store
contentStore cache.Store

metricsManager metrics.MetricsManager

resyncPeriod time.Duration

enableDistributedSnapshotting bool
}

// NewCSISnapshotController returns a new *csiSnapshotCommonController
Expand All @@ -74,10 +78,12 @@ func NewCSISnapshotCommonController(
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
nodeInformer coreinformers.NodeInformer,
metricsManager metrics.MetricsManager,
resyncPeriod time.Duration,
snapshotRateLimiter workqueue.RateLimiter,
contentRateLimiter workqueue.RateLimiter,
enableDistributedSnapshotting bool,
) *csiSnapshotCommonController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
Expand Down Expand Up @@ -125,6 +131,13 @@ func NewCSISnapshotCommonController(
ctrl.classLister = volumeSnapshotClassInformer.Lister()
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced

ctrl.enableDistributedSnapshotting = enableDistributedSnapshotting

if enableDistributedSnapshotting {
ctrl.nodeLister = nodeInformer.Lister()
ctrl.nodeListerSynced = nodeInformer.Informer().HasSynced
}

return ctrl
}

Expand All @@ -135,7 +148,12 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
klog.Infof("Starting snapshot controller")
defer klog.Infof("Shutting snapshot controller")

if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced}
if ctrl.enableDistributedSnapshotting {
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
}

if !cache.WaitForCacheSync(stopCh, informersSynced...) {
klog.Errorf("Cannot sync caches")
return
}
Expand Down
Loading

0 comments on commit 73543dc

Please sign in to comment.