Skip to content

Commit

Permalink
disk monitor: enrich and record error messages
Browse files Browse the repository at this point in the history
Longhorn 7095

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit authored and David Ko committed Dec 28, 2023
1 parent 93801ee commit b865e37
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 54 deletions.
75 changes: 43 additions & 32 deletions controller/monitor/disk_monitor.go
Expand Up @@ -30,6 +30,11 @@ const (
volumeMetaData = "volume.meta"
)

type DiskServiceClient struct {
c *engineapi.DiskService
err error
}

type NodeMonitor struct {
*baseMonitor

Expand All @@ -56,10 +61,10 @@ type CollectedDiskInfo struct {
OrphanedReplicaDirectoryNames map[string]string
}

type GetDiskStatHandler func(longhorn.DiskType, string, string, *engineapi.DiskService) (*lhtypes.DiskStat, error)
type GetDiskConfigHandler func(longhorn.DiskType, string, string, *engineapi.DiskService) (*util.DiskConfig, error)
type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, *engineapi.DiskService) (*util.DiskConfig, error)
type GetReplicaInstanceNamesHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, *engineapi.DiskService) (map[string]string, error)
type GetDiskStatHandler func(longhorn.DiskType, string, string, *DiskServiceClient) (*lhtypes.DiskStat, error)
type GetDiskConfigHandler func(longhorn.DiskType, string, string, *DiskServiceClient) (*util.DiskConfig, error)
type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, *DiskServiceClient) (*util.DiskConfig, error)
type GetReplicaInstanceNamesHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, *DiskServiceClient) (map[string]string, error)

func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) {
ctx, quit := context.WithCancel(context.Background())
Expand Down Expand Up @@ -188,36 +193,38 @@ func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine
return nil, fmt.Errorf("unknown data engine %v", dataEngine)
}

func (m *NodeMonitor) newDiskServiceClients(node *longhorn.Node) (map[longhorn.DataEngineType]*engineapi.DiskService, error) {
clients := map[longhorn.DataEngineType]*engineapi.DiskService{}
func (m *NodeMonitor) newDiskServiceClients(node *longhorn.Node) map[longhorn.DataEngineType]*DiskServiceClient {
clients := map[longhorn.DataEngineType]*DiskServiceClient{}

dataEngines := m.getDataEngines()

for _, dataEngine := range dataEngines {
// TODO: disk service is currently not used by filesystem-type disk for v1 data engine,
// so we can skip it for now.
if datastore.IsDataEngineV1(dataEngine) {
continue
}

var client *engineapi.DiskService
// TODO: disk service is not used by filesystem-type disk, so we can skip it for now.
if datastore.IsDataEngineV2(dataEngine) {
im, err := m.getRunningInstanceManagerRO(dataEngine)
if err != nil {
return clients, errors.Wrapf(err, "failed to get running instance manager for node %v", m.nodeName)
}

im, err := m.getRunningInstanceManagerRO(dataEngine)
if err == nil {
client, err = engineapi.NewDiskServiceClient(im, m.logger)
if err != nil {
return clients, errors.Wrapf(err, "failed to create disk service client for node %v", m.nodeName)
}
}

clients[dataEngine] = client
clients[dataEngine] = &DiskServiceClient{
c: client,
err: err,
}
}

return clients, nil
return clients
}

func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*engineapi.DiskService) {
func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
for _, client := range clients {
if client != nil {
client.Close()
if client.c != nil {
client.c.Close()
}
}
}
Expand All @@ -226,12 +233,7 @@ func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp
func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
diskInfoMap := make(map[string]*CollectedDiskInfo, 0)

diskServiceClients, err := m.newDiskServiceClients(node)
if err != nil {
// If failed to create disk service client, just log a warning and continue.
// The error out will hinder the following logics in syncNode.
m.logger.WithError(err).Warnf("Failed to create disk service client")
}
diskServiceClients := m.newDiskServiceClients(node)
defer func() {
m.closeDiskServiceClients(diskServiceClients)
}()
Expand All @@ -242,13 +244,22 @@ func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*Collected
orphanedReplicaInstanceNames := map[string]string{}
nodeOrDiskEvicted := isNodeOrDiskEvicted(node, disk)

if diskServiceClient == nil {
// TODO: disk service is not used by filesystem-type disk, so we can skip it for now.
if datastore.IsDataEngineV2(dataEngine) {
// TODO: disk service is currently not used by filesystem-type disk for v1 data engine.
if datastore.IsDataEngineV2(dataEngine) {
errMsg := ""
if diskServiceClient == nil {
errMsg = fmt.Sprintf("Disk %v (%v) on node %v is not ready: data engine is disabled",
diskName, disk.Path, node.Name)
} else {
if diskServiceClient.err != nil {
errMsg = fmt.Sprintf("Disk %v (%v) on node %v is not ready: %v",
diskName, disk.Path, node.Name, diskServiceClient.err.Error())
}
}
if errMsg != "" {
diskInfoMap[diskName] = NewDiskInfo(disk.Path, "", nodeOrDiskEvicted, nil,
orphanedReplicaInstanceNames, string(longhorn.DiskConditionReasonDiskServiceUnreachable),
fmt.Sprintf("Disk %v(%v) on node %v is not ready: disk service is unreachable",
diskName, disk.Path, node.Name))
errMsg)
continue
}
}
Expand Down Expand Up @@ -316,7 +327,7 @@ func isNodeOrDiskEvicted(node *longhorn.Node, disk longhorn.DiskSpec) bool {
return node.Spec.EvictionRequested || disk.EvictionRequested
}

func getReplicaInstanceNames(diskType longhorn.DiskType, node *longhorn.Node, diskName, diskUUID, diskPath string, client *engineapi.DiskService) (map[string]string, error) {
func getReplicaInstanceNames(diskType longhorn.DiskType, node *longhorn.Node, diskName, diskUUID, diskPath string, client *DiskServiceClient) (map[string]string, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return getReplicaDirectoryNames(node, diskName, diskUUID, diskPath)
Expand Down
30 changes: 15 additions & 15 deletions controller/monitor/disk_utils.go
Expand Up @@ -25,7 +25,7 @@ const (
)

// GetDiskStat returns the disk stat of the given directory
func getDiskStat(diskType longhorn.DiskType, name, path string, client *engineapi.DiskService) (stat *lhtypes.DiskStat, err error) {
func getDiskStat(diskType longhorn.DiskType, name, path string, client *DiskServiceClient) (stat *lhtypes.DiskStat, err error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return lhns.GetDiskStat(path)
Expand All @@ -36,12 +36,12 @@ func getDiskStat(diskType longhorn.DiskType, name, path string, client *engineap
}
}

func getBlockTypeDiskStat(client *engineapi.DiskService, name, path string) (stat *lhtypes.DiskStat, err error) {
if client == nil {
func getBlockTypeDiskStat(client *DiskServiceClient, name, path string) (stat *lhtypes.DiskStat, err error) {
if client == nil || client.c == nil {
return nil, errors.New("disk service client is nil")
}

info, err := client.DiskGet(string(longhorn.DiskTypeBlock), name, path)
info, err := client.c.DiskGet(string(longhorn.DiskTypeBlock), name, path)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +58,7 @@ func getBlockTypeDiskStat(client *engineapi.DiskService, name, path string) (sta
}

// GetDiskConfig returns the disk config of the given directory
func getDiskConfig(diskType longhorn.DiskType, name, path string, client *engineapi.DiskService) (*util.DiskConfig, error) {
func getDiskConfig(diskType longhorn.DiskType, name, path string, client *DiskServiceClient) (*util.DiskConfig, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return getFilesystemTypeDiskConfig(path)
Expand Down Expand Up @@ -88,12 +88,12 @@ func getFilesystemTypeDiskConfig(path string) (*util.DiskConfig, error) {
return cfg, nil
}

func getBlockTypeDiskConfig(client *engineapi.DiskService, name, path string) (config *util.DiskConfig, err error) {
if client == nil {
func getBlockTypeDiskConfig(client *DiskServiceClient, name, path string) (config *util.DiskConfig, err error) {
if client == nil || client.c == nil {
return nil, errors.New("disk service client is nil")
}

info, err := client.DiskGet(string(longhorn.DiskTypeBlock), name, path)
info, err := client.c.DiskGet(string(longhorn.DiskTypeBlock), name, path)
if err != nil {
if grpcstatus.Code(err) == grpccodes.NotFound {
return nil, errors.Wrapf(err, "cannot find disk info")
Expand All @@ -106,7 +106,7 @@ func getBlockTypeDiskConfig(client *engineapi.DiskService, name, path string) (c
}

// GenerateDiskConfig generates a disk config for the given directory
func generateDiskConfig(diskType longhorn.DiskType, name, uuid, path string, client *engineapi.DiskService) (*util.DiskConfig, error) {
func generateDiskConfig(diskType longhorn.DiskType, name, uuid, path string, client *DiskServiceClient) (*util.DiskConfig, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return generateFilesystemTypeDiskConfig(path)
Expand Down Expand Up @@ -159,12 +159,12 @@ func generateFilesystemTypeDiskConfig(path string) (*util.DiskConfig, error) {
return cfg, nil
}

func generateBlockTypeDiskConfig(client *engineapi.DiskService, name, uuid, path string, blockSize int64) (*util.DiskConfig, error) {
if client == nil {
func generateBlockTypeDiskConfig(client *DiskServiceClient, name, uuid, path string, blockSize int64) (*util.DiskConfig, error) {
if client == nil || client.c == nil {
return nil, errors.New("disk service client is nil")
}

info, err := client.DiskCreate(string(longhorn.DiskTypeBlock), name, uuid, path, blockSize)
info, err := client.c.DiskCreate(string(longhorn.DiskTypeBlock), name, uuid, path, blockSize)
if err != nil {
return nil, err
}
Expand All @@ -183,12 +183,12 @@ func DeleteDisk(diskType longhorn.DiskType, diskName, diskUUID string, client *e
}

// getSpdkReplicaInstanceNames returns the replica lvol names of the given disk
func getSpdkReplicaInstanceNames(client *engineapi.DiskService, diskType, diskName string) (map[string]string, error) {
if client == nil {
func getSpdkReplicaInstanceNames(client *DiskServiceClient, diskType, diskName string) (map[string]string, error) {
if client == nil || client.c == nil {
return nil, errors.New("disk service client is nil")
}

instances, err := client.DiskReplicaInstanceList(diskType, diskName)
instances, err := client.c.DiskReplicaInstanceList(diskType, diskName)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions controller/monitor/fake_disk_monitor.go
Expand Up @@ -10,7 +10,6 @@ import (
lhtypes "github.com/longhorn/go-common-libs/types"

"github.com/longhorn/longhorn-manager/datastore"
"github.com/longhorn/longhorn-manager/engineapi"
"github.com/longhorn/longhorn-manager/util"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
Expand Down Expand Up @@ -45,13 +44,13 @@ func NewFakeNodeMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, node
return m, nil
}

func fakeGetReplicaDirectoryNames(diskType longhorn.DiskType, node *longhorn.Node, diskName, diskUUID, diskPath string, client *engineapi.DiskService) (map[string]string, error) {
func fakeGetReplicaDirectoryNames(diskType longhorn.DiskType, node *longhorn.Node, diskName, diskUUID, diskPath string, client *DiskServiceClient) (map[string]string, error) {
return map[string]string{
TestOrphanedReplicaDirectoryName: "",
}, nil
}

func fakeGetDiskStat(diskType longhorn.DiskType, name, directory string, client *engineapi.DiskService) (*lhtypes.DiskStat, error) {
func fakeGetDiskStat(diskType longhorn.DiskType, name, directory string, client *DiskServiceClient) (*lhtypes.DiskStat, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return &lhtypes.DiskStat{
Expand Down Expand Up @@ -82,7 +81,7 @@ func fakeGetDiskStat(diskType longhorn.DiskType, name, directory string, client
}
}

func fakeGetDiskConfig(diskType longhorn.DiskType, name, path string, client *engineapi.DiskService) (*util.DiskConfig, error) {
func fakeGetDiskConfig(diskType longhorn.DiskType, name, path string, client *DiskServiceClient) (*util.DiskConfig, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return &util.DiskConfig{
Expand All @@ -97,7 +96,7 @@ func fakeGetDiskConfig(diskType longhorn.DiskType, name, path string, client *en
}
}

func fakeGenerateDiskConfig(diskType longhorn.DiskType, name, uuid, path string, client *engineapi.DiskService) (*util.DiskConfig, error) {
func fakeGenerateDiskConfig(diskType longhorn.DiskType, name, uuid, path string, client *DiskServiceClient) (*util.DiskConfig, error) {
return &util.DiskConfig{
DiskUUID: TestDiskID1,
}, nil
Expand Down
4 changes: 2 additions & 2 deletions controller/node_controller.go
Expand Up @@ -874,7 +874,7 @@ func (nc *NodeController) updateDiskStatusSchedulableCondition(node *longhorn.No
diskStatus.Conditions = types.SetConditionAndRecord(diskStatus.Conditions,
longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusFalse,
string(longhorn.DiskConditionReasonDiskNotReady),
fmt.Sprintf("the disk %v(%v) on the node %v is not ready", diskName, disk.Path, node.Name),
fmt.Sprintf("Disk %v (%v) on the node %v is not ready", diskName, disk.Path, node.Name),
nc.eventRecorder, node, corev1.EventTypeWarning)
} else {
// sync backing image managers
Expand Down Expand Up @@ -926,7 +926,7 @@ func (nc *NodeController) updateDiskStatusSchedulableCondition(node *longhorn.No
diskStatus.Conditions = types.SetConditionAndRecord(diskStatus.Conditions,
longhorn.DiskConditionTypeSchedulable, longhorn.ConditionStatusFalse,
string(longhorn.DiskConditionReasonDiskPressure),
fmt.Sprintf("the disk %v(%v) on the node %v has %v available, but requires reserved %v, minimal %v%s to schedule more replicas",
fmt.Sprintf("Disk %v (%v) on the node %v has %v available, but requires reserved %v, minimal %v%s to schedule more replicas",
diskName, disk.Path, node.Name, diskStatus.StorageAvailable, disk.StorageReserved, minimalAvailablePercentage, "%"),
nc.eventRecorder, node, corev1.EventTypeWarning)
} else {
Expand Down

0 comments on commit b865e37

Please sign in to comment.