Skip to content

Commit

Permalink
Merge pull request #590 from ystia/feature/GH-83-hostpool-placement
Browse files Browse the repository at this point in the history
Feature/gh 83 hostpool placement
  • Loading branch information
stebenoist committed Feb 5, 2020
2 parents 0b05622 + 88c74d2 commit 4fc9a29
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 283 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## UNRELEASED

### FEATURES

* Host pool election for compute allocation can be more relevant ([GH-83](https://github.com/ystia/yorc/issues/83))

### BUG FIXES

* Tosca public_ip_address attribute is wrongly set with private address for hosts pool computes ([GH-593](https://github.com/ystia/yorc/issues/593))
Expand Down
22 changes: 21 additions & 1 deletion data/tosca/yorc-hostspool-types.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ tosca_definitions_version: yorc_tosca_simple_yaml_1_0
metadata:
template_name: yorc-hostspool-types
template_author: yorc
template_version: 1.0.0
template_version: 1.1.0

imports:
- yorc: <yorc-types.yml>
Expand Down Expand Up @@ -33,3 +33,23 @@ node_types:
properties:
credentials:
user: "not significant, will be set by yorc itself"

policy_types:
yorc.policies.hostspool.Placement:
abstract: true
derived_from: tosca.policies.Placement
description: The yorc hostpool TOSCA Policy Placement.

yorc.policies.hostspool.WeightBalancedPlacement:
derived_from: yorc.policies.hostspool.Placement
description: >
The yorc hostpool TOSCA Policy placement which allows to allocate a host with a weight-balanced algorithm.
It means the host the less allocated will be elect preferentially.
targets: [ tosca.nodes.Compute ]

yorc.policies.hostspool.BinPackingPlacement:
derived_from: yorc.policies.hostspool.Placement
description: >
The yorc hostpool TOSCA Policy placement which allows to allocate a host with a bin packing algorithm.
It means the host the more allocated will be elect preferentially.
targets: [ tosca.nodes.Compute ]
1 change: 1 addition & 0 deletions helper/stringutil/stringutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestGetLastElement(t *testing.T) {
}{
{name: "TestWithSeparator", args: args{str: "tosca.interfaces.node.lifecycle.standard.create", separator: "."}, expected: "create"},
{name: "TestWithoutSeparator", args: args{str: "tosca.interfaces.node.lifecycle.standard.create", separator: "_"}, expected: ""},
{name: "TestEmptyString", args: args{str: "", separator: "_"}, expected: ""},
}

for _, tt := range tests {
Expand Down
3 changes: 3 additions & 0 deletions prov/hostspool/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func TestRunConsulHostsPoolPackageTests(t *testing.T) {
t.Run("testConsulManagerAllocateShareableCompute", func(t *testing.T) {
testConsulManagerAllocateShareableCompute(t, client)
})
t.Run("testConsulManagerAllocateWithWeightBalancedPlacement", func(t *testing.T) {
testConsulManagerAllocateWithWeightBalancedPlacement(t, client)
})
t.Run("testConsulManagerAllocateShareableComputeWithSameAllocationPrefix", func(t *testing.T) {
testConsulManagerAllocateShareableComputeWithSameAllocationPrefix(t, client)
})
Expand Down
257 changes: 43 additions & 214 deletions prov/hostspool/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"
"sync"

"github.com/dustin/go-humanize"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -138,13 +137,6 @@ func (e *defaultExecutor) execDelegateHostsPool(
return nil
}

func setInstancesStateWithContextualLogs(ctx context.Context, op operationParameters, instances []string, state tosca.NodeState) {

for _, instance := range instances {
deployments.SetInstanceStateWithContextualLogs(events.AddLogOptionalFields(ctx, events.LogOptionalFields{events.InstanceID: instance}), op.deploymentID, op.nodeName, instance, state)
}
}

func (e *defaultExecutor) hostsPoolCreate(ctx context.Context,
cc *api.Client, cfg config.Configuration,
op operationParameters, allocatedResources map[string]string) error {
Expand Down Expand Up @@ -182,12 +174,43 @@ func (e *defaultExecutor) hostsPoolCreate(ctx context.Context,
}
}

placement, err := e.getPlacementPolicy(ctx, op, op.nodeName)
if err != nil {
return err
}

instances, err := tasks.GetInstances(ctx, op.taskID, op.deploymentID, op.nodeName)
if err != nil {
return err
}

return e.allocateHostsToInstances(ctx, instances, shareable, filters, op, allocatedResources)
return e.allocateHostsToInstances(ctx, instances, shareable, filters, op, allocatedResources, placement)
}

func (e *defaultExecutor) getPlacementPolicy(ctx context.Context, op operationParameters, target string) (string, error) {
placementPolicies, err := deployments.GetPoliciesForTypeAndNode(ctx, op.deploymentID, placementPolicy, target)
if err != nil {
return "", err
}

if len(placementPolicies) == 0 {
return "", nil
}

if len(placementPolicies) > 1 {
return "", errors.Errorf("Found more than one placement policy to apply to node name:%q", target)
}

policyType, err := deployments.GetPolicyType(ctx, op.deploymentID, placementPolicies[0])
if err != nil {
return "", err
}

if err = op.hpManager.CheckPlacementPolicy(policyType); err != nil {
return "", err
}

return policyType, nil
}

func (e *defaultExecutor) allocateHostsToInstances(
Expand All @@ -196,17 +219,19 @@ func (e *defaultExecutor) allocateHostsToInstances(
shareable bool,
filters []labelsutil.Filter,
op operationParameters,
allocatedResources map[string]string) error {
allocatedResources map[string]string,
placement string) error {

for _, instance := range instances {
ctx := events.AddLogOptionalFields(originalCtx, events.LogOptionalFields{events.InstanceID: instance})

allocation := &Allocation{
NodeName: op.nodeName,
Instance: instance,
DeploymentID: op.deploymentID,
Shareable: shareable,
Resources: allocatedResources}
NodeName: op.nodeName,
Instance: instance,
DeploymentID: op.deploymentID,
Shareable: shareable,
Resources: allocatedResources,
PlacementPolicy: placement}

// Protecting the allocation and update of resources labels by a mutex, to
// ensure no other worker will attempt to over-allocate resources of a
Expand Down Expand Up @@ -309,40 +334,6 @@ func (e *defaultExecutor) updateConnectionSettings(
return setInstanceAttributesFromLabels(ctx, op, instance, host.Labels)
}

func setInstanceAttributesValue(ctx context.Context, op operationParameters, instance, value string, attributes []string) error {
for _, attr := range attributes {
err := deployments.SetInstanceAttribute(ctx, op.deploymentID, op.nodeName, instance,
attr, value)
if err != nil {
return err
}
}
return nil
}

func setInstanceAttributesFromLabels(ctx context.Context, op operationParameters, instance string, labels map[string]string) error {
for label, value := range labels {
err := setAttributeFromLabel(ctx, op.deploymentID, op.nodeName, instance,
label, value, tosca.ComputeNodeNetworksAttributeName, tosca.NetworkNameProperty)
if err != nil {
return err
}
err = setAttributeFromLabel(ctx, op.deploymentID, op.nodeName, instance,
label, value, tosca.ComputeNodeNetworksAttributeName, tosca.NetworkIDProperty)
if err != nil {
return err
}
// This is bad as we split value even if we are not sure that it matches
err = setAttributeFromLabel(ctx, op.deploymentID, op.nodeName, instance,
label, strings.Split(value, ","), tosca.ComputeNodeNetworksAttributeName,
tosca.NetworkAddressesProperty)
if err != nil {
return err
}
}
return nil
}

func (e *defaultExecutor) getAllocatedResourcesFromHostCapabilities(ctx context.Context, deploymentID, nodeName string) (map[string]string, error) {
res := make(map[string]string, 0)
p, err := deployments.GetCapabilityPropertyValue(ctx, deploymentID, nodeName, "host", "num_cpus")
Expand Down Expand Up @@ -371,82 +362,6 @@ func (e *defaultExecutor) getAllocatedResourcesFromHostCapabilities(ctx context.
return res, nil
}

func appendCapabilityFilter(ctx context.Context, deploymentID, nodeName, capName, propName, op string, filters []labelsutil.Filter) ([]labelsutil.Filter, error) {
p, err := deployments.GetCapabilityPropertyValue(ctx, deploymentID, nodeName, capName, propName)
if err != nil {
return filters, err
}

hasProp, propDataType, err := deployments.GetCapabilityPropertyType(ctx, deploymentID, nodeName, capName, propName)
if err != nil {
return filters, err
}

if p != nil && p.RawString() != "" {
var sb strings.Builder
sb.WriteString(capName)
sb.WriteString(".")
sb.WriteString(propName)
sb.WriteString(" ")
sb.WriteString(op)
sb.WriteString(" ")
if hasProp && propDataType == "string" {
// Strings need to be quoted in filters
sb.WriteString("'")
sb.WriteString(p.RawString())
sb.WriteString("'")

} else {
sb.WriteString(p.RawString())
}

f, err := labelsutil.CreateFilter(sb.String())
if err != nil {
return filters, err
}
return append(filters, f), nil
}
return filters, nil
}

func createFiltersFromComputeCapabilities(ctx context.Context, deploymentID, nodeName string) ([]labelsutil.Filter, error) {
var err error
filters := make([]labelsutil.Filter, 0)
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "host", "num_cpus", ">=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "host", "cpu_frequency", ">=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "host", "disk_size", ">=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "host", "mem_size", ">=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "os", "architecture", "=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "os", "type", "=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "os", "distribution", "=", filters)
if err != nil {
return nil, err
}
filters, err = appendCapabilityFilter(ctx, deploymentID, nodeName, "os", "version", "=", filters)
if err != nil {
return nil, err
}
return filters, nil
}

func (e *defaultExecutor) hostsPoolDelete(originalCtx context.Context, cc *api.Client,
cfg config.Configuration, op operationParameters, allocatedResources map[string]string) error {
instances, err := tasks.GetInstances(originalCtx, op.taskID, op.deploymentID, op.nodeName)
Expand All @@ -471,96 +386,10 @@ func (e *defaultExecutor) hostsPoolDelete(originalCtx context.Context, cc *api.C
if err != nil {
errs = multierror.Append(errs, err)
}
return op.hpManager.UpdateResourcesLabels(op.location, hostname.RawString(), allocatedResources, add, updateResourcesLabels)

}
return errors.Wrap(errs, "errors encountered during hosts pool node release. Some hosts maybe not properly released.")
}

func setAttributeFromLabel(ctx context.Context, deploymentID, nodeName, instance, label string, value interface{}, prefix, suffix string) error {
if strings.HasPrefix(label, prefix+".") && strings.HasSuffix(label, "."+suffix) {
attrName := strings.Replace(strings.Replace(label, prefix+".", prefix+"/", -1), "."+suffix, "/"+suffix, -1)
err := deployments.SetInstanceAttributeComplex(ctx, deploymentID, nodeName, instance, attrName, value)
err = op.hpManager.UpdateResourcesLabels(op.location, hostname.RawString(), allocatedResources, add, updateResourcesLabels)
if err != nil {
return err
}
}
return nil
}

func updateResourcesLabels(origin map[string]string, diff map[string]string, operation func(a int64, b int64) int64) (map[string]string, error) {
labels := make(map[string]string)

// Host Resources Labels can only be updated when deployment resources requirement is described
if cpusDiffStr, ok := diff["host.num_cpus"]; ok {
if cpusOriginStr, ok := origin["host.num_cpus"]; ok {
cpusOrigin, err := strconv.Atoi(cpusOriginStr)
if err != nil {
return nil, err
}
cpusDiff, err := strconv.Atoi(cpusDiffStr)
if err != nil {
return nil, err
}

res := operation(int64(cpusOrigin), int64(cpusDiff))
labels["host.num_cpus"] = strconv.Itoa(int(res))
}
}

if memDiffStr, ok := diff["host.mem_size"]; ok {
if memOriginStr, ok := origin["host.mem_size"]; ok {
memOrigin, err := humanize.ParseBytes(memOriginStr)
if err != nil {
return nil, err
}
memDiff, err := humanize.ParseBytes(memDiffStr)
if err != nil {
return nil, err
}

res := operation(int64(memOrigin), int64(memDiff))
labels["host.mem_size"] = formatBytes(res, isIECformat(memOriginStr))
}
}

if diskDiffStr, ok := diff["host.disk_size"]; ok {
if diskOriginStr, ok := origin["host.disk_size"]; ok {
diskOrigin, err := humanize.ParseBytes(diskOriginStr)
if err != nil {
return nil, err
}
diskDiff, err := humanize.ParseBytes(diskDiffStr)
if err != nil {
return nil, err
}

res := operation(int64(diskOrigin), int64(diskDiff))
labels["host.disk_size"] = formatBytes(res, isIECformat(diskOriginStr))
errs = multierror.Append(errs, err)
}
}

return labels, nil
}

func add(valA int64, valB int64) int64 {
return valA + valB
}

func subtract(valA int64, valB int64) int64 {
return valA - valB
}

func formatBytes(value int64, isIEC bool) string {
if isIEC {
return humanize.IBytes(uint64(value))
}
return humanize.Bytes(uint64(value))
}

func isIECformat(value string) bool {
if value != "" && strings.HasSuffix(value, "iB") {
return true
}
return false
return errors.Wrap(errs, "errors encountered during hosts pool node release. Some hosts maybe not properly released.")
}
1 change: 1 addition & 0 deletions prov/hostspool/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func routineExecDelegate(ctx context.Context, e *defaultExecutor, cc *api.Client
delegateOperation: delegateOperation,
hpManager: hpManager,
}

err := e.execDelegateHostsPool(ctx, cc, cfg, operationParams)
if err != nil {
fmt.Printf("Error executing operation %s on node %s: %s\n", delegateOperation, nodeName, err.Error())
Expand Down
1 change: 1 addition & 0 deletions prov/hostspool/hostspool_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Manager interface {
Release(locationName, hostname string, allocation *Allocation) error
ListLocations() ([]string, error)
RemoveLocation(locationName string) error
CheckPlacementPolicy(placementPolicy string) error
}

// SSHClientFactory is a that could be called to customize the client used to check the connection.
Expand Down

0 comments on commit 4fc9a29

Please sign in to comment.