Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Support node selector constrains [1.5.x backport] (#2227)
  • Loading branch information
liranbg committed Jun 15, 2021
1 parent 7b4eb22 commit d9e8bd1
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 0 deletions.
Expand Up @@ -93,11 +93,16 @@ The `spec` section contains the requirements and attributes and has the followin
| runtimeAttributes | See [reference](/docs/reference/runtimes/) | Runtime-specific attributes |
| resources | See [reference](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) | Limit resources allocated to deployed function |
| readinessTimeoutSeconds | int | Number of seconds that the controller will wait for the function to become ready before declaring failure (default: 60) |
| waitReadinessTimeoutBeforeFailure | bool | Wait for the expiration of the readiness timeout period even if the deployment fails or isn't expected to complete before the readinessTimeout expires |
| avatar | string | Base64 representation of an icon to be shown in UI for the function |
| eventTimeout | string | Global event timeout, in the format supported for the `Duration` parameter of the [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration) Go function |
| securityContext.runAsUser | int | The user ID (UID) for runing the entry point of the container process |
| securityContext.runAsGroup | int | The group ID (GID) for running the entry point of the container process |
| securityContext.fsGroup | int | A supplemental group to add and use for running the entry point of the container process |
| serviceType | string | Describes ingress methods for a service |
| affinity | v1.Affinity | Set of rules used to determine the node that schedule the pod |
| nodeSelector | map | Constrain function pod to a node by key-value pairs selectors |
| nodeName | string | Constrain function pod to a node by node name |

<a id="spec-example"></a>
### Example
Expand Down
6 changes: 6 additions & 0 deletions pkg/functionconfig/types.go
Expand Up @@ -260,6 +260,12 @@ type Spec struct {
ServiceAccount string `json:"serviceAccount,omitempty"`
ScaleToZero *ScaleToZeroSpec `json:"scaleToZero,omitempty"`

// Run function on a particular set of node(s)
// https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/
Affinity *v1.Affinity `json:"affinity,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
NodeName string `json:"nodeName,omitempty"`

// Currently relevant only for k8s platform
// if true - wait the whole ReadinessTimeoutSeconds before marking this function as unhealthy
// otherwise, fail the function instantly when there is indication of deployment failure (e.g. pod stuck on crash
Expand Down
18 changes: 18 additions & 0 deletions pkg/nuctl/command/deploy.go
Expand Up @@ -69,6 +69,8 @@ type deployCommandeer struct {
replicas int
minReplicas int
maxReplicas int
nodeName string
encodedNodeSelector string
runAsUser int64
runAsGroup int64
fsGroup int64
Expand Down Expand Up @@ -196,6 +198,8 @@ func addDeployFlags(cmd *cobra.Command,

cmd.Flags().StringVar(&commandeer.description, "desc", "", "Function description")
cmd.Flags().StringVarP(&commandeer.encodedLabels, "labels", "l", "", "Additional function labels (lbl1=val1[,lbl2=val2,...])")
cmd.Flags().StringVar(&commandeer.encodedNodeSelector, "nodeSelector", "", "Run function pod on a Node by key=value selection constraints (key1=val1[,key2=val2,...])")
cmd.Flags().StringVar(&commandeer.nodeName, "nodeName", "", "Run function pod on a Node by name-matching selection constrain")
cmd.Flags().VarP(&commandeer.encodedEnv, "env", "e", "Environment variables env1=val1")
cmd.Flags().BoolVarP(&commandeer.disable, "disable", "d", false, "Start the function as disabled (don't run yet)")
cmd.Flags().IntVarP(&commandeer.replicas, "replicas", "", -1, "Set to any non-negative integer to use a static number of replicas")
Expand Down Expand Up @@ -386,6 +390,10 @@ func (d *deployCommandeer) enrichConfigWithStringArgs() {
{Level: d.loggerLevel},
}
}

if d.nodeName != "" {
d.functionConfig.Spec.NodeName = d.nodeName
}
}

func (d *deployCommandeer) enrichConfigWithBoolArgs() {
Expand Down Expand Up @@ -526,6 +534,16 @@ func (d *deployCommandeer) enrichConfigWithComplexArgs() error {
d.functionConfig.Meta.Labels[label] = labelValue
}

// decode node selector
if d.encodedNodeSelector != "" {
if d.functionConfig.Spec.NodeSelector == nil {
d.functionConfig.Spec.NodeSelector = map[string]string{}
}
for key, value := range common.StringToStringMap(d.encodedNodeSelector, "=") {
d.functionConfig.Spec.NodeSelector[key] = value
}
}

// if the project name was set, add it as a label (not in string enrichment, because it's part of the labels)
if d.projectName != "" {
d.functionConfig.Meta.Labels["nuclio.io/project-name"] = d.projectName
Expand Down
7 changes: 7 additions & 0 deletions pkg/platform/kube/functionres/lazy.go
Expand Up @@ -753,6 +753,9 @@ func (lc *lazyClient) createOrUpdateDeployment(functionLabels labels.Set,
Volumes: volumes,
ServiceAccountName: function.Spec.ServiceAccount,
SecurityContext: function.Spec.SecurityContext,
Affinity: function.Spec.Affinity,
NodeSelector: function.Spec.NodeSelector,
NodeName: function.Spec.NodeName,
},
},
}
Expand Down Expand Up @@ -813,6 +816,10 @@ func (lc *lazyClient) createOrUpdateDeployment(functionLabels labels.Set,
deployment.Spec.Template.Spec.ServiceAccountName = function.Spec.ServiceAccount
}

deployment.Spec.Template.Spec.Affinity = function.Spec.Affinity
deployment.Spec.Template.Spec.NodeSelector = function.Spec.NodeSelector
deployment.Spec.Template.Spec.NodeName = function.Spec.NodeName

// enrich deployment spec with default fields that were passed inside the platform configuration
// performed on update too, in case the platform config has been modified after the creation of this deployment
if err := lc.enrichDeploymentFromPlatformConfiguration(function, deployment, method); err != nil {
Expand Down
39 changes: 39 additions & 0 deletions pkg/platform/kube/functionres/lazy_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package functionres

import (
"context"
"testing"

"github.com/nuclio/nuclio/pkg/functionconfig"
Expand Down Expand Up @@ -68,6 +69,44 @@ func (suite *lazyTestSuite) SetupTest() {
})
}

func (suite *lazyTestSuite) TestNodeConstrains() {
functionInstance := &nuclioio.NuclioFunction{}
functionInstance.Name = "func-name"
functionInstance.Spec.NodeName = "some-node-name"
functionInstance.Spec.NodeSelector = map[string]string{
"some-key": "some-value",
}
functionInstance.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "req-key",
Values: []string{
"a",
"b",
},
},
},
},
},
},
},
}
resources, err := suite.client.CreateOrUpdate(context.TODO(), functionInstance, "")
suite.Require().NoError(err)
suite.Require().NotEmpty(resources)
deployment, err := resources.Deployment()
suite.Require().NoError(err)

// ensure fields were passed
deployment.Spec.Template.Spec.NodeName = functionInstance.Spec.NodeName
deployment.Spec.Template.Spec.NodeSelector = functionInstance.Spec.NodeSelector
deployment.Spec.Template.Spec.Affinity = functionInstance.Spec.Affinity
}

func (suite *lazyTestSuite) TestNoChanges() {
one := 1
function := nuclioio.NuclioFunction{
Expand Down
100 changes: 100 additions & 0 deletions pkg/platform/kube/test/platform_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/nuclio/nuclio/pkg/platform/kube/ingress"
"github.com/nuclio/nuclio/pkg/platformconfig"

"github.com/gobuffalo/flect"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/nuclio/errors"
Expand All @@ -43,6 +44,7 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type DeployFunctionTestSuite struct {
Expand Down Expand Up @@ -306,6 +308,104 @@ func (suite *DeployFunctionTestSuite) TestSecurityContext() {
})
}

func (suite *DeployFunctionTestSuite) TestAssigningFunctionPodToNodes() {

// TODO: currently is not working on minikube
suite.T().Skip("Run manually")
existingNodes := suite.GetNodes()
suite.Require().NotEmpty(existingNodes, "Must have at least one node available")

testNodeName := existingNodes[0].GetName()
testLabelKey := "test-nuclio.io"

labelPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/labels/%s","value":"%s"}]`, testLabelKey, "true")
_, err := suite.KubeClientSet.CoreV1().Nodes().Patch(testNodeName, types.JSONPatchType, []byte(labelPatch))
suite.Require().NoError(err, "Failed to patch node labels")

// undo changes
defer func() {
suite.Logger.DebugWith("Rolling back node labels change")
labelPatch = fmt.Sprintf(`[{"op":"remove","path":"/metadata/labels/%s"}]`, testLabelKey)
_, err := suite.KubeClientSet.CoreV1().Nodes().Patch(testNodeName, types.JSONPatchType, []byte(labelPatch))
suite.Require().NoError(err, "Failed to patch node labels")
}()

for _, testCase := range []struct {
name string

// function spec
nodeName string
nodeSelector map[string]string
nodeAffinity *v1.Affinity
expectedFailure bool
}{
{
name: "AssignByNodeName",
nodeName: testNodeName,
},
{
name: "UnscheduledNoSuchNodeName",
nodeName: "nuclio-do-not-should-not-exists",
expectedFailure: true,
},
{
name: "AssignByNodeSelector",
nodeSelector: map[string]string{
testLabelKey: "true",
},
},
{
name: "UnscheduledNoSuchLabel",
nodeSelector: map[string]string{
testLabelKey: "false",
},
expectedFailure: true,
},
} {
suite.Run(testCase.name, func() {
functionName := flect.Dasherize(testCase.name)
createFunctionOptions := suite.CompileCreateFunctionOptions(functionName)
createFunctionOptions.FunctionConfig.Spec.NodeName = testCase.nodeName
createFunctionOptions.FunctionConfig.Spec.Affinity = testCase.nodeAffinity
createFunctionOptions.FunctionConfig.Spec.NodeSelector = testCase.nodeSelector
if testCase.expectedFailure {

// dont wait for too long
createFunctionOptions.FunctionConfig.Spec.ReadinessTimeoutSeconds = 15
_, err := suite.DeployFunctionExpectError(createFunctionOptions,
func(deployResult *platform.CreateFunctionResult) bool {
return true
})
suite.Require().Error(err)

if testCase.nodeSelector != nil {
pods := suite.GetFunctionPods(functionName)
podEvents, err := suite.KubeClientSet.CoreV1().Events(suite.Namespace).List(metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s", pods[0].GetName()),
})
suite.Require().NoError(err)
suite.Require().NotNil(podEvents)
suite.Require().Equal("FailedScheduling", podEvents.Items[0].Reason)
}
} else {
suite.DeployFunction(createFunctionOptions, func(deployResult *platform.CreateFunctionResult) bool {
pods := suite.GetFunctionPods(functionName)
if testCase.nodeName != "" {
suite.Require().Equal(testCase.nodeName, pods[0].Spec.NodeName)
}
if testCase.nodeSelector != nil {
suite.Require().Equal(testCase.nodeSelector, pods[0].Spec.NodeSelector)
}
if testCase.nodeAffinity != nil {
suite.Require().Equal(testCase.nodeAffinity, pods[0].Spec.Affinity)
}
return true
})
}
})
}
}

func (suite *DeployFunctionTestSuite) TestAugmentedConfig() {
runAsUserID := int64(1000)
runAsGroupID := int64(2000)
Expand Down
22 changes: 22 additions & 0 deletions pkg/platform/kube/test/suite.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package test

import (
"context"
"encoding/base64"
"fmt"
"strings"
Expand Down Expand Up @@ -271,6 +272,27 @@ func (suite *KubeTestSuite) GetFunctionPods(functionName string) []v1.Pod {
return pods.Items
}

func (suite *KubeTestSuite) GetNodes() []v1.Node {
nodesList, err := suite.KubeClientSet.CoreV1().Nodes().List(metav1.ListOptions{})
suite.Require().NoError(err)
return nodesList.Items
}

func (suite *KubeTestSuite) DeleteFunctionPods(functionName string) {
errGroup, _ := errgroup.WithContext(context.TODO())
for _, pod := range suite.GetFunctionPods(functionName) {
pod := pod
errGroup.Go(func() error {
suite.Logger.DebugWith("Deleting function pod", "podName", pod.Name)
return suite.KubeClientSet.
CoreV1().
Pods(suite.Namespace).
Delete(pod.Name, metav1.NewDeleteOptions(0))
})
}
suite.Require().NoError(errGroup.Wait(), "Failed to delete function pods")
}

func (suite *KubeTestSuite) GetResourceAndUnmarshal(resourceKind, resourceName string, resource interface{}) {
resourceContent := suite.getResource(resourceKind, resourceName)
err := yaml.Unmarshal([]byte(resourceContent), resource)
Expand Down

0 comments on commit d9e8bd1

Please sign in to comment.