Skip to content

Commit

Permalink
Implement volume health monitoring feature to detect abnormal volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveenrajmani committed Sep 25, 2023
1 parent 74c3e9f commit 2702abe
Show file tree
Hide file tree
Showing 22 changed files with 1,014 additions and 61 deletions.
18 changes: 17 additions & 1 deletion cmd/directpv/node-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/consts"
Expand All @@ -33,7 +34,11 @@ import (
"k8s.io/klog/v2"
)

var metricsPort = consts.MetricsPort
var (
metricsPort = consts.MetricsPort
volumeHealthMonitorInterval = 10 * time.Minute
enableVolumeHealthMonitor bool
)

var nodeServerCmd = &cobra.Command{
Use: consts.NodeServerName,
Expand All @@ -56,6 +61,8 @@ var nodeServerCmd = &cobra.Command{

func init() {
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "Interval for volume health monitoring in duration. Example: '20m','1h'")
}

func startNodeServer(ctx context.Context) error {
Expand Down Expand Up @@ -114,6 +121,15 @@ func startNodeServer(ctx context.Context) error {
}
}()

if enableVolumeHealthMonitor {
go func() {
if err := volume.RunHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
klog.ErrorS(err, "unable to run volume health monitor")
errCh <- err
}
}()
}

return <-errCh
}

Expand Down
40 changes: 23 additions & 17 deletions cmd/kubectl-directpv/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ import (
)

var (
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
enableVolumeHealthMonitor bool
)

var installCmd = &cobra.Command{
Expand Down Expand Up @@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv
7. Install DirectPV with seccomp profile
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json
8. Install DirectPV with volume health monitoring enabled
$ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
`{PLUGIN_NAME}`,
consts.AppName,
),
Expand Down Expand Up @@ -123,6 +127,7 @@ func init() {
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
installCmd.PersistentFlags().MarkHidden("declarative")
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
}

func validateNodeSelectorArgs() error {
Expand Down Expand Up @@ -305,8 +310,9 @@ func installMain(ctx context.Context) {
}
}
}
args.Declarative = declarativeFlag
args.Openshift = openshiftFlag
args.Declarative = declarativeFlag
args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor

var failed bool
var installedComponents []installer.Component
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubectl-directpv/list_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ func listVolumesMain(ctx context.Context) {
status = "Released"
case volume.IsDriveLost():
status = "Lost"
case volume.HasError():
status = "Error"
case volume.IsPublished():
status = "Bounded"
}
Expand Down
18 changes: 18 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,21 @@ scrape_configs:
action: replace
target_label: kubernetes_name
```

# Volume health monitoring

This is a [CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/) introduced as an Alpha feature in Kubernetes v1.19 and a second Alpha was done in v1.21. This feature is to detect "abnormal" volume conditions and report them as events on PVCs and Pods. A DirectPV volume will be considered as "abnormal" if the respective volume mounts are not present in the host.

For node side monitoring, the feature gate `CSIVolumeHealth` needs to be enabled. However, DirectPV also installs external health monitor controller which monitors and reports volume health events to PVCs.

To enable volume health monitoring, Install directpv with `--enable-volume-health-monitoring` flag.

```sh
kubectl directpv install --enable-volume-health-monitoring
```

For private registries, please note that the following image is required for enabling volume health monitoring

```
quay.io/minio/csi-external-health-monitor-controller:v0.10.0
```
11 changes: 8 additions & 3 deletions pkg/apis/directpv.min.io/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,28 @@ type VolumeConditionType string

// Enum value of VolumeConditionType type.
const (
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeError VolumeConditionType = "Error"
)

// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
type VolumeConditionReason string

// Enum values of VolumeConditionReason type.
const (
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
VolumeConditionReasonNoError VolumeConditionReason = "NoError"
)

// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
type VolumeConditionMessage string

// Enum values of VolumeConditionMessage type.
const (
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
)

// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.
Expand Down
41 changes: 41 additions & 0 deletions pkg/apis/directpv.min.io/v1beta1/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package v1beta1
import (
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -123,6 +125,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
return false
}

// HasError returns if the volume is in error state.
func (volume DirectPVVolume) HasError() bool {
condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
return condition != nil && condition.Status == metav1.ConditionTrue
}

// SetDriveLost sets associated drive is lost.
func (volume *DirectPVVolume) SetDriveLost() {
c := metav1.Condition{
Expand Down Expand Up @@ -316,6 +324,39 @@ func (volume *DirectPVVolume) Resume() bool {
return volume.RemoveLabel(types.SuspendLabelKey)
}

// ResetStageMountErrorCondition resets the stage volume mount error condition.
func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageStagingPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// ResetTargetMountErrorCondition resets the target volume mount error condition.
func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageTargetPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// GetCSIVolumeCondition returns the CSI volume condition.
func (volume *DirectPVVolume) GetCSIVolumeCondition() *csi.VolumeCondition {
var isAbnormal bool
var message string
errorCondition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
if errorCondition != nil && errorCondition.Status == metav1.ConditionTrue {
isAbnormal = true
message = errorCondition.Message
}
return &csi.VolumeCondition{
Abnormal: isAbnormal,
Message: message,
}
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DirectPVVolumeList denotes list of volumes.
Expand Down
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ const (

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "10m"
)
3 changes: 3 additions & 0 deletions pkg/consts/consts.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ const (

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "10m"
)
82 changes: 78 additions & 4 deletions pkg/csi/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
},
},
},
}, nil
}
Expand Down Expand Up @@ -359,8 +374,52 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller

// ListVolumes implements ListVolumes controller RPC
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
Limit: int64(req.GetMaxEntries()),
Continue: req.GetStartingToken(),
})
if err != nil {
if req.GetStartingToken() != "" {
return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
}
return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
}
var volumeEntries []*csi.ListVolumesResponse_Entry
for _, volume := range result.Items {
csiVolume, err := getCSIVolume(ctx, &volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
Volume: csiVolume,
Status: &csi.ListVolumesResponse_VolumeStatus{
VolumeCondition: volume.GetCSIVolumeCondition(),
},
})
}
return &csi.ListVolumesResponse{
Entries: volumeEntries,
NextToken: result.Continue,
}, nil
}

func getCSIVolume(ctx context.Context, volume *types.Volume) (*csi.Volume, error) {
drive, err := client.DriveClient().Get(ctx, string(volume.GetDriveID()), metav1.GetOptions{
TypeMeta: types.NewDriveTypeMeta(),
})
if err != nil {
return nil, err
}
return &csi.Volume{
CapacityBytes: volume.Status.TotalCapacity,
VolumeId: volume.Name,
AccessibleTopology: []*csi.Topology{
{
Segments: drive.Status.Topology,
},
},
}, nil
}

// ControllerPublishVolume - controller RPC to publish volumes
Expand All @@ -377,8 +436,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU

// ControllerGetVolume - controller RPC for get volume
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
volume, err := client.VolumeClient().Get(
ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
csiVolume, err := getCSIVolume(ctx, volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.ControllerGetVolumeResponse{
Volume: csiVolume,
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
VolumeCondition: volume.GetCSIVolumeCondition(),
},
}, nil
}

// ListSnapshots - controller RPC for listing snapshots
Expand Down
Loading

0 comments on commit 2702abe

Please sign in to comment.