Skip to content

Commit

Permalink
Merge pull request #3319 from abayer/pipelineruninfo-api-call-removal
Browse files Browse the repository at this point in the history
fix: Drastically decrease API calls for PipelineRunInfo creation
  • Loading branch information
jenkins-x-bot committed Mar 9, 2019
2 parents 089554b + d7a9841 commit c62422a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 41 deletions.
20 changes: 19 additions & 1 deletion pkg/jx/cmd/controller_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,25 @@ func (o *ControllerBuildOptions) onPipelinePod(obj interface{}, kubeClient kuber
if pod.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] != "" {
if pod.Labels[syntax.LabelStageName] != "" {
prName := pod.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey]
pri, err := tekton.CreatePipelineRunInfo(kubeClient, tektonClient, jxClient, ns, prName)
pr, err := tektonClient.TektonV1alpha1().PipelineRuns(ns).Get(prName, metav1.GetOptions{})
if err != nil {
log.Warnf("Error getting PipelineRun for name %s: %s\n", prName, err)
return
}
// Get the Pod for this PipelineRun
podList, err := kubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: builds.LabelPipelineRunName + "=" + prName,
})
if err != nil {
log.Warnf("Error getting PodList for PipelineRun %s: %s\n", prName, err)
return
}
structure, err := jxClient.JenkinsV1().PipelineStructures(ns).Get(prName, metav1.GetOptions{})
if err != nil {
log.Warnf("Error getting PipelineStructure for PipelineRun %s: %s\n", prName, err)
return
}
pri, err := tekton.CreatePipelineRunInfo(prName, podList, structure, pr)
if err != nil {
log.Warnf("Error creating PipelineRunInfo for PipelineRun %s: %s\n", prName, err)
return
Expand Down
38 changes: 35 additions & 3 deletions pkg/jx/cmd/get_build_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jenkins-x/jx/pkg/gits"
"github.com/jenkins-x/jx/pkg/kube"
"github.com/jenkins-x/jx/pkg/tekton"
"github.com/knative/build-pipeline/pkg/apis/pipeline"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -320,7 +321,19 @@ func (o *GetBuildLogsOptions) getProwBuildLog(kubeClient kubernetes.Interface, t
if stage.Pod == nil {
// The stage's pod hasn't been created yet, so let's wait a bit.
f := func() error {
if err := stage.SetPodsForStageInfo(kubeClient, tektonClient, ns, pr.PipelineRun); err != nil {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
pipeline.GroupName + pipeline.PipelineRunLabelKey: pr.PipelineRun,
}})
if err != nil {
return err
}
podList, err := kubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
if err := stage.SetPodsForStageInfo(podList, pr.PipelineRun); err != nil {
return err
}

Expand Down Expand Up @@ -490,15 +503,33 @@ func (o *GetBuildLogsOptions) loadPipelines(kubeClient kubernetes.Interface, tek
pipelineMap := map[string]builds.BaseBuildInfo{}

prList, err := tektonClient.TektonV1alpha1().PipelineRuns(ns).List(metav1.ListOptions{})

if err != nil {
log.Warnf("Failed to query PipelineRuns %s\n", err)
return names, defaultName, buildMap, pipelineMap, err
}

structures, err := jxClient.JenkinsV1().PipelineStructures(ns).List(metav1.ListOptions{})
if err != nil {
log.Warnf("Failed to query PipelineStructures %s\n", err)
return names, defaultName, buildMap, pipelineMap, err
}

buildInfos := []*tekton.PipelineRunInfo{}

podList, err := kubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: pipeline.GroupName + pipeline.PipelineRunLabelKey,
})
if err != nil {
return names, defaultName, buildMap, pipelineMap, err
}
for _, pr := range prList.Items {
pri, err := tekton.CreatePipelineRunInfo(kubeClient, tektonClient, jxClient, ns, pr.Name)
var ps *v1.PipelineStructure
for _, p := range structures.Items {
if p.Name == pr.Name {
ps = &p
}
}
pri, err := tekton.CreatePipelineRunInfo(pr.Name, podList, ps, &pr)
if err != nil {
if o.Verbose {
log.Warnf("Error creating PipelineRunInfo for PipelineRun %s: %s\n", pr.Name, err)
Expand All @@ -508,6 +539,7 @@ func (o *GetBuildLogsOptions) loadPipelines(kubeClient kubernetes.Interface, tek
buildInfos = append(buildInfos, pri)
}
}

tekton.SortPipelineRunInfos(buildInfos)
if len(buildInfos) == 0 {
return names, defaultName, buildMap, pipelineMap, fmt.Errorf("no Tekton pipelines have been triggered which match the current filter")
Expand Down
55 changes: 23 additions & 32 deletions pkg/tekton/pipeline_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/jenkins-x/jx/pkg/tekton/syntax"
"github.com/jenkins-x/jx/pkg/util"
"github.com/knative/build-pipeline/pkg/apis/pipeline"
tektonclient "github.com/knative/build-pipeline/pkg/client/clientset/versioned"
tektonv1alpha1 "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -179,7 +179,7 @@ func (si *StageInfo) getOrderedTaskStagesForStage() []*StageInfo {
}

// CreatePipelineRunInfo looks up the PipelineRun for a given name and creates the PipelineRunInfo for it
func CreatePipelineRunInfo(kubeClient kubernetes.Interface, tektonClient tektonclient.Interface, jxClient versioned.Interface, ns, prName string) (*PipelineRunInfo, error) {
func CreatePipelineRunInfo(prName string, podList *corev1.PodList, ps *v1.PipelineStructure, pr *tektonv1alpha1.PipelineRun) (*PipelineRunInfo, error) {
branch := ""
lastCommitSha := ""
lastCommitMessage := ""
Expand All @@ -193,9 +193,8 @@ func CreatePipelineRunInfo(kubeClient kubernetes.Interface, tektonClient tektonc
}
gitURL := ""

pr, err := tektonClient.TektonV1alpha1().PipelineRuns(ns).Get(prName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, fmt.Sprintf("PipelineRun %s cannot be found", prName))
if pr == nil {
return nil, errors.New(fmt.Sprintf("PipelineRun %s cannot be found", prName))
}

pri := &PipelineRunInfo{
Expand All @@ -207,7 +206,7 @@ func CreatePipelineRunInfo(kubeClient kubernetes.Interface, tektonClient tektonc
var pod *corev1.Pod

prStatus := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if err := pri.SetPodsForPipelineRun(kubeClient, tektonClient, jxClient, ns); err != nil {
if err := pri.SetPodsForPipelineRun(podList, ps); err != nil {
return nil, errors.Wrapf(err, "Failure populating stages and pods for PipelineRun %s", prName)
}

Expand Down Expand Up @@ -329,14 +328,11 @@ func CreatePipelineRunInfo(kubeClient kubernetes.Interface, tektonClient tektonc
}

// SetPodsForPipelineRun populates the pods for all stages within its PipelineRunInfo
func (pri *PipelineRunInfo) SetPodsForPipelineRun(kubeClient kubernetes.Interface, tektonClient tektonclient.Interface, jxClient versioned.Interface, ns string) error {
func (pri *PipelineRunInfo) SetPodsForPipelineRun(podList *corev1.PodList, ps *v1.PipelineStructure) error {
if pri.PipelineRun == "" {
return errors.New("No PipelineRun specified")
}
ps, err := getPipelineStructureForPipelineRun(jxClient, ns, pri.PipelineRun)
if err != nil {
return err
}

if ps == nil {
return errors.New(fmt.Sprintf("Could not find PipelineStructure for PipelineRun %s", pri.PipelineRun))
}
Expand All @@ -353,7 +349,7 @@ func (pri *PipelineRunInfo) SetPodsForPipelineRun(kubeClient kubernetes.Interfac
if firstTaskStage == nil {
firstTaskStage = si
}
if err := si.SetPodsForStageInfo(kubeClient, tektonClient, ns, pri.PipelineRun); err != nil {
if err := si.SetPodsForStageInfo(podList, pri.PipelineRun); err != nil {
return errors.Wrapf(err, "Couldn't populate Pods for Stages")
}
}
Expand All @@ -362,29 +358,24 @@ func (pri *PipelineRunInfo) SetPodsForPipelineRun(kubeClient kubernetes.Interfac
}

// SetPodsForStageInfo populates the pods for a particular stage and/or its children
func (si *StageInfo) SetPodsForStageInfo(kubeClient kubernetes.Interface, tektonClient tektonclient.Interface, ns, prName string) error {
if si.Task != "" {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
pipeline.GroupName + pipeline.PipelineRunLabelKey: prName,
syntax.LabelStageName: syntax.MangleToRfc1035Label(si.Name, ""),
}})
if err != nil {
return err
}
podList, err := kubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
func (si *StageInfo) SetPodsForStageInfo(podList *corev1.PodList, prName string) error {
var podListItems []corev1.Pod

for _, p := range podList.Items {
if p.Labels[syntax.LabelStageName] == syntax.MangleToRfc1035Label(si.Name, "") && p.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] == prName {
podListItems = append(podListItems, p)
}
if len(podList.Items) == 0 {
}

if si.Task != "" {
if len(podListItems) == 0 {
// TODO: Probably the pod just hasn't started yet, so return nil
return nil
}
if len(podList.Items) > 1 {
return errors.New(fmt.Sprintf("Too many Pods (%d) found for PipelineRun %s and Stage %s", len(podList.Items), prName, si.Name))
if len(podListItems) > 1 {
return errors.New(fmt.Sprintf("Too many Pods (%d) found for PipelineRun %s and Stage %s", len(podListItems), prName, si.Name))
}
pod := podList.Items[0]
pod := podListItems[0]
si.PodName = pod.Name
si.Task = pod.Labels[builds.LabelTaskName]
si.TaskRun = pod.Labels[builds.LabelTaskRunName]
Expand All @@ -398,13 +389,13 @@ func (si *StageInfo) SetPodsForStageInfo(kubeClient kubernetes.Interface, tekton
}
} else if len(si.Stages) > 0 {
for _, child := range si.Stages {
if err := child.SetPodsForStageInfo(kubeClient, tektonClient, ns, prName); err != nil {
if err := child.SetPodsForStageInfo(podList, prName); err != nil {
return err
}
}
} else if len(si.Parallel) > 0 {
for _, child := range si.Parallel {
if err := child.SetPodsForStageInfo(kubeClient, tektonClient, ns, prName); err != nil {
if err := child.SetPodsForStageInfo(podList, prName); err != nil {
return err
}
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/tekton/pipeline_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (

"github.com/ghodss/yaml"
"github.com/google/go-cmp/cmp"
v1fake "github.com/jenkins-x/jx/pkg/client/clientset/versioned/fake"
"github.com/jenkins-x/jx/pkg/gits"
"github.com/jenkins-x/jx/pkg/tekton"
"github.com/jenkins-x/jx/pkg/tekton/syntax"
"github.com/jenkins-x/jx/pkg/tekton/tekton_helpers_test"
tektonfake "github.com/knative/build-pipeline/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

func TestCreatePipelineRunInfo(t *testing.T) {
Expand Down Expand Up @@ -190,22 +189,26 @@ func TestCreatePipelineRunInfo(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
testCaseDir := path.Join("test_data", "pipeline_info", tt.name)
kubeClient := fake.NewSimpleClientset(tekton_helpers_test.AssertLoadPods(t, testCaseDir))

jxObjects := []runtime.Object{tekton_helpers_test.AssertLoadPipelineActivity(t, testCaseDir)}
structure := tekton_helpers_test.AssertLoadPipelineStructure(t, testCaseDir)
if structure != nil {
jxObjects = append(jxObjects, structure)
}
jxClient := v1fake.NewSimpleClientset(jxObjects...)

tektonObjects := []runtime.Object{tekton_helpers_test.AssertLoadPipelineRun(t, testCaseDir), tekton_helpers_test.AssertLoadPipeline(t, testCaseDir)}
tektonObjects = append(tektonObjects, tekton_helpers_test.AssertLoadTasks(t, testCaseDir))
tektonObjects = append(tektonObjects, tekton_helpers_test.AssertLoadTaskRuns(t, testCaseDir))
tektonObjects = append(tektonObjects, tekton_helpers_test.AssertLoadPipelineResources(t, testCaseDir))
tektonClient := tektonfake.NewSimpleClientset(tektonObjects...)

pri, err := tekton.CreatePipelineRunInfo(kubeClient, tektonClient, jxClient, ns, tt.prName)
podList := tekton_helpers_test.AssertLoadPods(t, testCaseDir)

pr, err := tektonClient.TektonV1alpha1().PipelineRuns(ns).Get(tt.prName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error fetching PipelineRun: %s", err)
}
pri, err := tekton.CreatePipelineRunInfo(tt.prName, podList, structure, pr)
if err != nil {
t.Fatalf("Error creating PipelineRunInfo: %s", err)
}
Expand Down

0 comments on commit c62422a

Please sign in to comment.