Skip to content

Commit

Permalink
local volume: implement scheduling logic
Browse files Browse the repository at this point in the history
Longhorn 3957

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Nov 25, 2022
1 parent bcd70a0 commit 5b708c6
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 20 deletions.
16 changes: 13 additions & 3 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (ec *EngineController) syncEngine(key string) (err error) {
}

if engine.Status.CurrentState == longhorn.InstanceStateRunning {
// we allow across monitoring temporaily due to migration case
// we allow across monitoring temporarily due to migration case
if !ec.isMonitoring(engine) {
ec.startMonitoring(engine)
} else if engine.Status.ReplicaModeMap != nil {
Expand Down Expand Up @@ -434,12 +434,17 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP
return nil, err
}

v, err := ec.ds.GetVolume(e.Spec.VolumeName)
if err != nil {
return nil, err
}

engineCLIAPIVersion, err := ec.ds.GetEngineImageCLIAPIVersion(e.Spec.EngineImage)
if err != nil {
return nil, err
}

return c.EngineProcessCreate(e, frontend, engineReplicaTimeout, engineCLIAPIVersion)
return c.EngineProcessCreate(e, frontend, engineReplicaTimeout, v.Spec.DataLocality, engineCLIAPIVersion)
}

func (ec *EngineController) DeleteInstance(obj interface{}) error {
Expand Down Expand Up @@ -1694,12 +1699,17 @@ func (ec *EngineController) UpgradeEngineProcess(e *longhorn.Engine) error {
return err
}

v, err := ec.ds.GetVolume(e.Spec.VolumeName)
if err != nil {
return err
}

engineCLIAPIVersion, err := ec.ds.GetEngineImageCLIAPIVersion(e.Spec.EngineImage)
if err != nil {
return err
}

engineProcess, err := c.EngineProcessUpgrade(e, frontend, engineReplicaTimeout, engineCLIAPIVersion)
engineProcess, err := c.EngineProcessUpgrade(e, frontend, engineReplicaTimeout, v.Spec.DataLocality, engineCLIAPIVersion)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,12 @@ func (rc *ReplicaController) CreateInstance(obj interface{}) (*longhorn.Instance
}
defer c.Close()

return c.ReplicaProcessCreate(r, dataPath, backingImagePath)
v, err := rc.ds.GetVolume(r.Spec.VolumeName)
if err != nil {
return nil, err
}

return c.ReplicaProcessCreate(r, dataPath, backingImagePath, v.Spec.DataLocality)
}

func (rc *ReplicaController) GetBackingImagePathForReplicaStarting(r *longhorn.Replica) (string, error) {
Expand Down
28 changes: 23 additions & 5 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (vc *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es m
// Migrate local replica when Data Locality is on
// We turn off data locality while doing auto-attaching or restoring (e.g. frontend is disabled)
if v.Status.State == longhorn.VolumeStateAttached && !v.Status.FrontendDisabled &&
!isDataLocalityDisabled(v) && !hasLocalReplicaOnSameNodeAsEngine(e, rs) {
isDataLocalityBestEffort(v) && !hasLocalReplicaOnSameNodeAsEngine(e, rs) {
if err := vc.replenishReplicas(v, e, rs, e.Spec.NodeID); err != nil {
return err
}
Expand Down Expand Up @@ -1124,7 +1124,8 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri
if len(e.Status.Snapshots) > VolumeSnapshotsWarningThreshold {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusTrue,
longhorn.VolumeConditionReasonTooManySnapshots, fmt.Sprintf("Snapshots count is %v over the warning threshold %v", len(e.Status.Snapshots), VolumeSnapshotsWarningThreshold))
longhorn.VolumeConditionReasonTooManySnapshots, fmt.Sprintf("Snapshots count is %v over the warning threshold %v",
len(e.Status.Snapshots), VolumeSnapshotsWarningThreshold))
} else {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusFalse,
Expand All @@ -1147,6 +1148,14 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri
if r.Spec.NodeID != "" {
continue
}
if v.Spec.DataLocality == longhorn.DataLocalityStrictLocal {
if v.Spec.NodeID == "" {
continue
}

r.Spec.HardNodeAffinity = v.Spec.NodeID
}

scheduledReplica, multiError, err := vc.scheduler.ScheduleReplica(r, rs, v)
if err != nil {
return err
Expand Down Expand Up @@ -1213,7 +1222,7 @@ func (vc *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[stri
}

if err := vc.ds.UpdatePVAnnotation(v, types.PVAnnotationLonghornVolumeSchedulingError, failureMessage); err != nil {
log.Warnf("Cannot update PV annotation for volume %v", v.Name)
log.WithError(err).Warnf("Cannot update PV annotation for volume %v", v.Name)
}

if err := vc.reconcileVolumeSize(v, e, rs); err != nil {
Expand Down Expand Up @@ -1679,6 +1688,14 @@ func findValueWithBiggestLength(m map[string][]string) []string {
return m[targetKey]
}

func isDataLocalityBestEffort(v *longhorn.Volume) bool {
return v.Spec.DataLocality == longhorn.DataLocalityBestEffort
}

func isDataLocalityStrictLocal(v *longhorn.Volume) bool {
return v.Spec.DataLocality == longhorn.DataLocalityStrictLocal
}

func isDataLocalityDisabled(v *longhorn.Volume) bool {
return string(v.Spec.DataLocality) == "" || v.Spec.DataLocality == longhorn.DataLocalityDisabled
}
Expand Down Expand Up @@ -2131,7 +2148,7 @@ func (vc *VolumeController) getReplenishReplicasCount(v *longhorn.Volume, rs map
usableCount := 0
for _, r := range rs {
// The failed to schedule local replica shouldn't be counted
if !isDataLocalityDisabled(v) && r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == "" &&
if isDataLocalityBestEffort(v) && r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == "" &&
v.Status.CurrentNodeID != "" && r.Spec.HardNodeAffinity == v.Status.CurrentNodeID {
continue
}
Expand Down Expand Up @@ -2921,7 +2938,8 @@ func (vc *VolumeController) createEngine(v *longhorn.Volume, isNewEngine bool) (
return vc.ds.CreateEngine(engine)
}

func (vc *VolumeController) createReplica(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string, isRebuildingReplica bool) error {
func (vc *VolumeController) createReplica(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica,
hardNodeAffinity string, isRebuildingReplica bool) error {
log := getLoggerForVolume(vc.logger, v)

replica := &longhorn.Replica{
Expand Down
13 changes: 8 additions & 5 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,16 +1366,19 @@ func (s *DataStore) CheckEngineImageReadiness(image string, nodes ...string) (is
}

// CheckEngineImageReadyOnAtLeastOneVolumeReplica checks if the IMAGE is deployed on the NODEID and on at least one of the the volume's replicas
func (s *DataStore) CheckEngineImageReadyOnAtLeastOneVolumeReplica(image, volumeName, nodeID string) (bool, error) {
func (s *DataStore) CheckEngineImageReadyOnAtLeastOneVolumeReplica(image, volumeName, nodeID string, dataLocality longhorn.DataLocality) (bool, error) {
isReady, err := s.CheckEngineImageReadiness(image, nodeID)

if (dataLocality == longhorn.DataLocalityStrictLocal) ||
(nodeID != "" && !isReady) {
return isReady, err
}

replicas, err := s.ListVolumeReplicas(volumeName)
if err != nil {
return false, fmt.Errorf("cannot get replicas for volume %v: %w", volumeName, err)
}

isReady, err := s.CheckEngineImageReadiness(image, nodeID)
if nodeID != "" && !isReady {
return isReady, err
}
for _, r := range replicas {
isReady, err := s.CheckEngineImageReadiness(image, r.Spec.NodeID)
if err != nil || isReady {
Expand Down
21 changes: 18 additions & 3 deletions engineapi/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *InstanceManagerClient) parseProcess(p *imapi.Process) *longhorn.Instanc

}

func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) {
func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, dataLocality longhorn.DataLocality, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) {
if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil {
return nil, err
}
Expand Down Expand Up @@ -178,6 +178,10 @@ func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFr
if engineCLIAPIVersion >= 7 {
args = append(args,
"--engine-replica-timeout", strconv.FormatInt(engineReplicaTimeout, 10))

if dataLocality == longhorn.DataLocalityStrictLocal {
args = append(args, "--data-server-protocol", "unix")
}
}

for _, addr := range e.Status.CurrentReplicaAddressMap {
Expand All @@ -193,7 +197,7 @@ func (c *InstanceManagerClient) EngineProcessCreate(e *longhorn.Engine, volumeFr
return c.parseProcess(engineProcess), nil
}

func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica, dataPath, backingImagePath string) (*longhorn.InstanceProcess, error) {
func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica, dataPath, backingImagePath string, dataLocality longhorn.DataLocality) (*longhorn.InstanceProcess, error) {
if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil {
return nil, err
}
Expand All @@ -208,6 +212,12 @@ func (c *InstanceManagerClient) ReplicaProcessCreate(replica *longhorn.Replica,
args = append(args, "--disableRevCounter")
}

args = append(args, "--volume-name", replica.Spec.VolumeName)

if dataLocality == longhorn.DataLocalityStrictLocal {
args = append(args, "--data-server-protocol", "unix")
}

binary := filepath.Join(types.GetEngineBinaryDirectoryForReplicaManagerContainer(replica.Spec.EngineImage), types.EngineBinaryName)

replicaProcess, err := c.grpcClient.ProcessCreate(
Expand Down Expand Up @@ -267,7 +277,7 @@ func (c *InstanceManagerClient) ProcessList() (map[string]longhorn.InstanceProce
return result, nil
}

func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) {
func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeFrontend longhorn.VolumeFrontend, engineReplicaTimeout int64, dataLocality longhorn.DataLocality, engineCLIAPIVersion int) (*longhorn.InstanceProcess, error) {
if err := CheckInstanceManagerCompatibility(c.apiMinVersion, c.apiVersion); err != nil {
return nil, err
}
Expand All @@ -289,6 +299,11 @@ func (c *InstanceManagerClient) EngineProcessUpgrade(e *longhorn.Engine, volumeF
if engineCLIAPIVersion >= 7 {
args = append(args,
"--engine-replica-timeout", strconv.FormatInt(engineReplicaTimeout, 10))

if dataLocality == longhorn.DataLocalityStrictLocal {
args = append(args,
"--data-server-protocol", "unix")
}
}

binary := filepath.Join(types.GetEngineBinaryDirectoryForEngineManagerContainer(e.Spec.EngineImage), types.EngineBinaryName)
Expand Down
2 changes: 1 addition & 1 deletion manager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (m *VolumeManager) Attach(name, nodeID string, disableFrontend bool, attach
return nil, err
}

if isReady, err := m.ds.CheckEngineImageReadyOnAtLeastOneVolumeReplica(v.Spec.EngineImage, v.Name, nodeID); !isReady {
if isReady, err := m.ds.CheckEngineImageReadyOnAtLeastOneVolumeReplica(v.Spec.EngineImage, v.Name, nodeID, v.Spec.DataLocality); !isReady {
if err != nil {
return nil, fmt.Errorf("cannot attach volume %v with image %v: %v", v.Name, v.Spec.EngineImage, err)
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/replica_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon
if exists {
disks[r.Spec.DiskID] = struct{}{}
} else {
disks = map[string]struct{}{r.Spec.DiskID: struct{}{}}
disks = map[string]struct{}{r.Spec.DiskID: {}}
}
availableNodesInfo[r.Spec.NodeID] = allNodesInfo[r.Spec.NodeID]
availableNodeDisksMap[r.Spec.NodeID] = disks
Expand Down Expand Up @@ -472,7 +472,7 @@ func (rcs *ReplicaScheduler) CheckAndReuseFailedReplica(replicas map[string]*lon
}
}
if reusedReplica == nil {
logrus.Infof("Cannot find a reusable failed replicas")
logrus.Infof("Cannot find a reusable failed replicas for volume %v", volume.Name)
return nil, nil
}

Expand Down

0 comments on commit 5b708c6

Please sign in to comment.