Skip to content

Commit

Permalink
update affinity assistant creation implementation
Browse files Browse the repository at this point in the history
Before this commit, the affinity assistant was created in the beginning of
the pipleineRun. And the same affinity assistant was relied on for the
entire lifecycle of a PR. Now, there could be a case when the node
on which affinity assistant pod is created goes down. In this case,
the rest of the pipelineRun is stuck and cannot run to the completition
since the affinity assistant (StatefulSet) tries to schedule rest
of the pods (taskRuns) on the same node but that node is cordoned or not
scheduling anything new.

This commit always makes an attempt to create Affinity Assistant (StatefulSet)
in case it does not exist. If it exist, the controller checks if the node on
which Affinity Assistant pod is created is healthy to schedule subsequent
pods. If not, the controller deletes Affinity Assistant pod so that
StatefulSet can upscale the replicas (set to 1) on other node in the cluster.

Signed-off-by: Priti Desai <pdesai@us.ibm.com>
  • Loading branch information
pritidesai committed May 9, 2023
1 parent 77c1698 commit f19cef0
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/200-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ rules:
# Controller needs to watch Pods created by TaskRuns to see them progress.
resources: ["pods"]
verbs: ["list", "watch"]
- apiGroups: [""]
# Controller needs to watch nodes for their health over the course of a single run
resources: ["nodes"]
verbs: ["get", "list", "watch"]
# Controller needs cluster access to all of the CRDs that it is responsible for
# managing.
- apiGroups: ["tekton.dev"]
Expand Down
32 changes: 30 additions & 2 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
for _, w := range wb {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
Expand All @@ -68,6 +69,33 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
// get the list of pods associated with a given StatefulSet based on the following label:
// app.kubernetes.io/instance=affinity-assistant-641c8a0038
pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: workspace.LabelInstance + "=" + a.Name,
})
if err != nil || len(pods.Items) == 0 {
return fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)
}
// iterate over the list of pods - at most there can be only one pod as the requested replicas is set to 1
for _, pod := range pods.Items {
// get the node based on the affinity assistant pod - pod.Spec.nodeName
n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created, err: %w", pod.Spec.NodeName, err)
}
// node which hosts the affinity assistant pod is unschedulable or cordoned for some reason
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
if n.Spec.Unschedulable {
err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", pod.Name, pod.Namespace, err)
}
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down Expand Up @@ -107,7 +135,7 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.
func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
return fmt.Sprintf("%s-%s", "affinity-assistant", hashString[:10])
return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10])
}

func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) map[string]string {
Expand Down
125 changes: 125 additions & 0 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipelinerun

import (
"context"
"encoding/json"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -28,9 +29,11 @@ import (
"github.com/tektoncd/pipeline/pkg/workspace"
"github.com/tektoncd/pipeline/test/diff"
"github.com/tektoncd/pipeline/test/parse"
"gomodules.xyz/jsonpatch/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
fakek8s "k8s.io/client-go/kubernetes/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -88,6 +91,128 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) {
}
}

// TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and
// can migrate the affinity assistant pod to a healthy node
func TestCreateAffinityAssistantWhenNodeIsCordoned(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
Images: pipeline.Images{},
}

workspaceName := "testws"
pipelineRunName := "pipelinerun-1"
testPipelineRun := &v1beta1.PipelineRun{
TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRunName,
},
Spec: v1beta1.PipelineRunSpec{
Workspaces: []v1beta1.WorkspaceBinding{{
Name: workspaceName,
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "myclaim",
},
}},
},
}

// create affinity assistant in a cluster
err := c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err != nil {
t.Errorf("unexpected error from createAffinityAssistants: %v", err)
}

// get the affinity assistant just created
expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name)

// assume the necessary pod is created, update the affinity assistant with the number of pods in readyReplicas
var replaceReplicasPatchBytes []byte
replaceReplicasPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{
Operation: "replace",
Path: "/status/readyReplicas",
Value: int32(1),
}})
_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Patch(ctx, expectedAffinityAssistantName, types.JSONPatchType, replaceReplicasPatchBytes, metav1.PatchOptions{})
if err != nil {
t.Errorf("unexpected error updating StatefulSet: %v", err)
}

// add a cordoned node to the list of nodes
c.KubeClientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "soon-to-be-cordoned-node",
},
Spec: corev1.NodeSpec{
Unschedulable: true,
},
}, metav1.CreateOptions{})

// Test 1: createAffinityAssistants must return an error when the StatefulSet has the readyReplicas set to 1 without any pod with the necessary label created
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err == nil {
t.Error("expected error from createAffinityAssistants, but no error was reported")
}

// now, define a pod with the affinity assistant labels and owner references
p := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: expectedAffinityAssistantName + "-0",
Labels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
workspace.LabelInstance: expectedAffinityAssistantName,
"statefulset.kubernetes.io/pod-name": expectedAffinityAssistantName + "-0",
},
OwnerReferences: []metav1.OwnerReference{{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: expectedAffinityAssistantName,
}},
},
}
// create an affinity-assistant pod with necessary labels
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{
ObjectMeta: p.ObjectMeta,
Spec: corev1.PodSpec{
NodeName: "soon-to-be-cordoned-node",
},
}, metav1.CreateOptions{})
if err != nil {
t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err)
}

// Test 2: createAffinityAssistants must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err != nil {
t.Errorf("unexpected error from createAffinityAssistants: %v", err)
}
// validation of Test 2, the affinity assistant pod must be deleted with an affinity assistant running on a cordoned node
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
}

// Test 3: createAffinityAssistants must return an error when the affinity assistant pod has a nodeName which does not exist
// create an affinity-assistant pod with necessary labels
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{
ObjectMeta: p.ObjectMeta,
Spec: corev1.PodSpec{
NodeName: "missing-node",
},
}, metav1.CreateOptions{})
if err != nil {
t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err)
}
// validation of Test 3, try to create affinity assistant again with an existing StatefulSet, affinity assistant pod, and missing node
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err == nil {
t.Error("expected error from createAffinityAssistants, but no error was reported")
}
}

func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
prWithCustomPodTemplate := &v1beta1.PipelineRun{
TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"},
Expand Down
20 changes: 11 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,16 +609,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return controller.NewPermanentError(err)
}
}
}

if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
// Make an attempt to create Affinity Assistant if it does not exist
// if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod
if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
}

Expand Down

0 comments on commit f19cef0

Please sign in to comment.