From 5b708c63afd9ef42c15062d15382a63ab40bb057 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Tue, 22 Nov 2022 18:09:16 +0800 Subject: [PATCH] local volume: implement scheduling logic Longhorn 3957 Signed-off-by: Derek Su --- controller/engine_controller.go | 16 +++++++++++++--- controller/replica_controller.go | 7 ++++++- controller/volume_controller.go | 28 +++++++++++++++++++++++----- datastore/longhorn.go | 13 ++++++++----- engineapi/instance_manager.go | 21 ++++++++++++++++++--- manager/volume.go | 2 +- scheduler/replica_scheduler.go | 4 ++-- 7 files changed, 71 insertions(+), 20 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index bd7e51d392..ef446a20f8 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -334,7 +334,7 @@ func (ec *EngineController) syncEngine(key string) (err error) { } if engine.Status.CurrentState == longhorn.InstanceStateRunning { - // we allow across monitoring temporaily due to migration case + // we allow across monitoring temporarily due to migration case if !ec.isMonitoring(engine) { ec.startMonitoring(engine) } else if engine.Status.ReplicaModeMap != nil { @@ -434,12 +434,17 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP return nil, err } + v, err := ec.ds.GetVolume(e.Spec.VolumeName) + if err != nil { + return nil, err + } + engineCLIAPIVersion, err := ec.ds.GetEngineImageCLIAPIVersion(e.Spec.EngineImage) if err != nil { return nil, err } - return c.EngineProcessCreate(e, frontend, engineReplicaTimeout, engineCLIAPIVersion) + return c.EngineProcessCreate(e, frontend, engineReplicaTimeout, v.Spec.DataLocality, engineCLIAPIVersion) } func (ec *EngineController) DeleteInstance(obj interface{}) error { @@ -1694,12 +1699,17 @@ func (ec *EngineController) UpgradeEngineProcess(e *longhorn.Engine) error { return err } + v, err := ec.ds.GetVolume(e.Spec.VolumeName) + if err != nil { + return err + } + engineCLIAPIVersion, err := ec.ds.GetEngineImageCLIAPIVersion(e.Spec.EngineImage) if err != nil { return err } - engineProcess, err := c.EngineProcessUpgrade(e, frontend, engineReplicaTimeout, engineCLIAPIVersion) + engineProcess, err := c.EngineProcessUpgrade(e, frontend, engineReplicaTimeout, v.Spec.DataLocality, engineCLIAPIVersion) if err != nil { return err } diff --git a/controller/replica_controller.go b/controller/replica_controller.go index 27ad4be237..662d0764b3 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -397,7 +397,12 @@ func (rc *ReplicaController) CreateInstance(obj interface{}) (*longhorn.Instance } defer c.Close() - return c.ReplicaProcessCreate(r, dataPath, backingImagePath) + v, err := rc.ds.GetVolume(r.Spec.VolumeName) + if err != nil { + return nil, err + } + + return c.ReplicaProcessCreate(r, dataPath, backingImagePath, v.Spec.DataLocality) } func (rc *ReplicaController) GetBackingImagePathForReplicaStarting(r *longhorn.Replica) (string, error) { diff --git a/controller/volume_controller.go b/controller/volume_controller.go index 4aa146b5cb..d91846cac8 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -646,7 +646,7 @@ func (vc *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es m // Migrate local replica when Data Locality is on // We turn off data locality while doing auto-attaching or restoring (e.g. frontend is disabled) if v.Status.State == longhorn.VolumeStateAttached && !v.Status.FrontendDisabled && - !isDataLocalityDisabled(v) && !hasLocalReplicaOnSameNodeAsEngine(e, rs) { + isDataLocalityBestEffort(v) && !hasLocalReplicaOnSameNodeAsEngine(e, rs) { if err := vc.replenishReplicas(v, e, rs, e.Spec.NodeID); err != nil { return err } @@ -1124,7 +1124,8 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri if len(e.Status.Snapshots) > VolumeSnapshotsWarningThreshold { v.Status.Conditions = types.SetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusTrue, - longhorn.VolumeConditionReasonTooManySnapshots, fmt.Sprintf("Snapshots count is %v over the warning threshold %v", len(e.Status.Snapshots), VolumeSnapshotsWarningThreshold)) + longhorn.VolumeConditionReasonTooManySnapshots, fmt.Sprintf("Snapshots count is %v over the warning threshold %v", + len(e.Status.Snapshots), VolumeSnapshotsWarningThreshold)) } else { v.Status.Conditions = types.SetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusFalse, @@ -1147,6 +1148,14 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri if r.Spec.NodeID != "" { continue } + if v.Spec.DataLocality == longhorn.DataLocalityStrictLocal { + if v.Spec.NodeID == "" { + continue + } + + r.Spec.HardNodeAffinity = v.Spec.NodeID + } + scheduledReplica, multiError, err := vc.scheduler.ScheduleReplica(r, rs, v) if err != nil { return err @@ -1213,7 +1222,7 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri } if err := vc.ds.UpdatePVAnnotation(v, types.PVAnnotationLonghornVolumeSchedulingError, failureMessage); err != nil { - log.Warnf("Cannot update PV annotation for volume %v", v.Name) + log.WithError(err).Warnf("Cannot update PV annotation for volume %v", v.Name) } if err := vc.reconcileVolumeSize(v, e, rs); err != nil { @@ -1679,6 +1688,14 @@ func findValueWithBiggestLength(m map[string][]string) []string { return m[targetKey] } +func isDataLocalityBestEffort(v *longhorn.Volume) bool { + return v.Spec.DataLocality == longhorn.DataLocalityBestEffort +} + +func isDataLocalityStrictLocal(v *longhorn.Volume) bool { + return v.Spec.DataLocality == longhorn.DataLocalityStrictLocal +} + func isDataLocalityDisabled(v *longhorn.Volume) bool { return string(v.Spec.DataLocality) == "" || v.Spec.DataLocality == longhorn.DataLocalityDisabled } @@ -2131,7 +2148,7 @@ func (vc *VolumeController) getReplenishReplicasCount(v *longhorn.Volume, rs map usableCount := 0 for _, r := range rs { // The failed to schedule local replica shouldn't be counted - if !isDataLocalityDisabled(v) && r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == "" && + if isDataLocalityBestEffort(v) && r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == "" && v.Status.CurrentNodeID != "" && r.Spec.HardNodeAffinity == v.Status.CurrentNodeID { continue } @@ -2921,7 +2938,8 @@ func (vc *VolumeController) createEngine(v *longhorn.Volume, isNewEngine bool) ( return vc.ds.CreateEngine(engine) } -func (vc *VolumeController) createReplica(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string, isRebuildingReplica bool) error { +func (vc *VolumeController) createReplica(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, + hardNodeAffinity string, isRebuildingReplica bool) error { log := getLoggerForVolume(vc.logger, v) replica := &longhorn.Replica{ diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 5fd722cd01..f50db3584f 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -1366,16 +1366,19 @@ func (s *DataStore) CheckEngineImageReadiness(image string, nodes ...string) (is } // CheckEngineImageReadyOnAtLeastOneVolumeReplica checks if the IMAGE is deployed on the NODEID and on at least one of the the volume's replicas -func (s *DataStore) CheckEngineImageReadyOnAtLeastOneVolumeReplica(image, volumeName, nodeID string) (bool, error) { +func (s *DataStore) CheckEngineImageReadyOnAtLeastOneVolumeReplica(image, volumeName, nodeID string, dataLocality longhorn.DataLocality) (bool, error) { + isReady, err := s.CheckEngineImageReadiness(image, nodeID) + + if (dataLocality == longhorn.DataLocalityStrictLocal) || + (nodeID != "" && !isReady) { + return isReady, err + } + replicas, err := s.ListVolumeReplicas(volumeName) if err != nil { return false, fmt.Errorf("cannot get replicas for volume %v: %w", volumeName, err) } - isReady, err := s.CheckEngineImageReadiness(image, nodeID) - if nodeID != "" && !isReady { - return isReady, err - } for _, r := range replicas { isReady, err := s.CheckEngineImageReadiness(image, r.Spec.NodeID) if err != nil || isReady { diff --git a/engineapi/instance_manager.go b/engineapi/instance_manager.go index 2a1af4b189..4458a85df8 100644 --- a/engineapi/instance_manager.go +++ b/engineapi/instance_manager.go @@ -149,7 +149,7 @@ func (c *InstanceManagerClient) parseProcess(p *imapi.Process) *longhorn.Instanc } -func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) { +func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, dataLocality longhorn.DataLocality, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) { if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil { return nil, err } @@ -178,6 +178,10 @@ func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFr if engineCLIAPIVersion >= 7 { args = append(args, "--engine-replica-timeout", strconv.FormatInt(engineReplicaTimeout, 10)) + + if dataLocality == longhorn.DataLocalityStrictLocal { + args = append(args, "--data-server-protocol", "unix") + } } for _, addr := range e.Status.CurrentReplicaAddressMap { @@ -193,7 +197,7 @@ func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFr return c.parseProcess(engineProcess), nil } -func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica, dataPath, backingImagePath string) (*longhorn.InstanceProcess, error) { +func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica, dataPath, backingImagePath string, dataLocality longhorn.DataLocality) (*longhorn.InstanceProcess, error) { if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil { return nil, err } @@ -208,6 +212,12 @@ func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica, args = append(args, "--disableRevCounter") } + args = append(args, "--volume-name", replica.Spec.VolumeName) + + if dataLocality == longhorn.DataLocalityStrictLocal { + args = append(args, "--data-server-protocol", "unix") + } + binary := filepath.Join(types.GetEngineBinaryDirectoryForReplicaManagerContainer(replica.Spec.EngineImage), types.EngineBinaryName) replicaProcess, err := c.grpcClient.ProcessCreate( @@ -267,7 +277,7 @@ func (c *InstanceManagerClient) ProcessList() (map[string]longhorn.InstanceProce return result, nil } -func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) { +func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, dataLocality longhorn.DataLocality, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) { if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil { return nil, err } @@ -289,6 +299,11 @@ func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeF if engineCLIAPIVersion >= 7 { args = append(args, "--engine-replica-timeout", strconv.FormatInt(engineReplicaTimeout, 10)) + + if dataLocality == longhorn.DataLocalityStrictLocal { + args = append(args, + "--data-server-protocol", "unix") + } } binary := filepath.Join(types.GetEngineBinaryDirectoryForEngineManagerContainer(e.Spec.EngineImage), types.EngineBinaryName) diff --git a/manager/volume.go b/manager/volume.go index 660938f503..1e1992f9a8 100644 --- a/manager/volume.go +++ b/manager/volume.go @@ -231,7 +231,7 @@ func (m *VolumeManager) Attach(name, nodeID string, disableFrontend bool, attach return nil, err } - if isReady, err := m.ds.CheckEngineImageReadyOnAtLeastOneVolumeReplica(v.Spec.EngineImage, v.Name, nodeID); !isReady { + if isReady, err := m.ds.CheckEngineImageReadyOnAtLeastOneVolumeReplica(v.Spec.EngineImage, v.Name, nodeID, v.Spec.DataLocality); !isReady { if err != nil { return nil, fmt.Errorf("cannot attach volume %v with image %v: %v", v.Name, v.Spec.EngineImage, err) } diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go index cc82d77174..1b17a54d57 100644 --- a/scheduler/replica_scheduler.go +++ b/scheduler/replica_scheduler.go @@ -444,7 +444,7 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon if exists { disks[r.Spec.DiskID] = struct{}{} } else { - disks = map[string]struct{}{r.Spec.DiskID: struct{}{}} + disks = map[string]struct{}{r.Spec.DiskID: {}} } availableNodesInfo[r.Spec.NodeID] = allNodesInfo[r.Spec.NodeID] availableNodeDisksMap[r.Spec.NodeID] = disks @@ -472,7 +472,7 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon } } if reusedReplica == nil { - logrus.Infof("Cannot find a reusable failed replicas") + logrus.Infof("Cannot find a reusable failed replicas for volume %v", volume.Name) return nil, nil }