Skip to content

Commit

Permalink
feat(Backend + SDK): Update kfp backend and kubernetes sdk to support…
Browse files Browse the repository at this point in the history
… pod labels and annotations (kubeflow#10393)

* update kfp kubernetes sdk to include pod labels and annotations

* fix unit test output order

* add podmetadata changes

* update argo compiler to support pod metadata

* update tests

* update go mod to use the latest kubernetes_platform package

* update licenses

* address comments

* update kubernetes_platform package to include the latest spec

---------

Co-authored-by: Chen Sun <chensun@users.noreply.github.com>
  • Loading branch information
2 people authored and stijntratsaertit committed Feb 16, 2024
1 parent d0ff031 commit ee376a4
Show file tree
Hide file tree
Showing 11 changed files with 742 additions and 18 deletions.
5 changes: 5 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Expand Up @@ -52,6 +52,11 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "../testdata/create_mount_delete_dynamic_pvc_platform.json",
argoYAMLPath: "testdata/create_mount_delete_dynamic_pvc.yaml",
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "../testdata/create_pod_metadata.json",
argoYAMLPath: "testdata/create_pod_metadata.yaml",
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down
86 changes: 69 additions & 17 deletions backend/src/v2/compiler/argocompiler/container.go
Expand Up @@ -15,19 +15,22 @@
package argocompiler

import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/component"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
k8score "k8s.io/api/core/v1"
)

const (
volumeNameKFPLauncher = "kfp-launcher"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
LauncherImageEnvVar = "V2_LAUNCHER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
)

func (c *workflowCompiler) Container(name string, component *pipelinespec.ComponentSpec, container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec) error {
Expand Down Expand Up @@ -58,19 +61,19 @@ type containerDriverInputs struct {
}

func GetLauncherImage() string {
launcherImage := os.Getenv(LauncherImageEnvVar)
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
launcherImage := os.Getenv(LauncherImageEnvVar)
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
}

func GetDriverImage() string {
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
}

func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
Expand Down Expand Up @@ -169,14 +172,14 @@ type containerExecutorInputs struct {
// name: argo workflows DAG task name
// The other arguments are argo workflows task parameters, they can be either a
// string or a placeholder.
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs) *wfapi.DAGTask {
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs, refName string) *wfapi.DAGTask {
when := ""
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(),
Template: c.addContainerExecutorTemplate(refName),
When: when,
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{
Expand All @@ -191,7 +194,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
// any container component task.
// During runtime, it's expected that pod-spec-patch will specify command, args
// and resources etc, that are different for different tasks.
func (c *workflowCompiler) addContainerExecutorTemplate() string {
func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
// container template is parent of container implementation template
nameContainerExecutor := "system-container-executor"
nameContainerImpl := "system-container-impl"
Expand Down Expand Up @@ -273,7 +276,56 @@ func (c *workflowCompiler) addContainerExecutorTemplate() string {
Env: commonEnvs,
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
c.templates[nameContainerImpl] = executor
c.wf.Spec.Templates = append(c.wf.Spec.Templates, *container, *executor)
return nameContainerExecutor
}

// Extends the PodMetadata to include Kubernetes-specific executor config.
// Although the current podMetadata object is always empty, this function
// doesn't overwrite the existing podMetadata because for security reasons
// the existing podMetadata should have higher privilege than the user definition.
func extendPodMetadata(
podMetadata *wfapi.Metadata,
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
) {
// Get pod metadata information
if kubernetesExecutorConfig.GetPodMetadata() != nil {
labels := kubernetesExecutorConfig.GetPodMetadata().GetLabels()
if labels != nil {
if podMetadata.Labels == nil {
podMetadata.Labels = labels
} else {
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, labels)
}
}
annotations := kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
if annotations != nil {
if podMetadata.Annotations == nil {
podMetadata.Annotations = annotations
} else {
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, annotations)
}
}
}
}

// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
// The original Map inputs should have higher priority since its defined by admin
// TODO: Use maps.Copy after moving to go 1.21+
func extendMetadataMap(
highPriorityMap map[string]string,
lowPriorityMap map[string]string,
) map[string]string {
for k, v := range highPriorityMap {
lowPriorityMap[k] = v
}
return lowPriorityMap
}
90 changes: 90 additions & 0 deletions backend/src/v2/compiler/argocompiler/container_test.go
@@ -0,0 +1,90 @@
// Copyright 2021-2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package argocompiler

import (
"testing"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/stretchr/testify/assert"
)

func Test_extendPodMetadata(t *testing.T) {
tests := []struct {
name string
podMetadata *wfapi.Metadata
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig
expected *wfapi.Metadata
}{
{
"Valid - add pod labels and annotations",
&wfapi.Metadata{},
&kubernetesplatform.KubernetesExecutorConfig{
PodMetadata: &kubernetesplatform.PodMetadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
{
"Valid - try overwrite default pod labels and annotations",
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "654321",
},
Labels: map[string]string{
"kubeflow.com/kfp": "default-node",
},
},
&kubernetesplatform.KubernetesExecutorConfig{
PodMetadata: &kubernetesplatform.PodMetadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "654321",
},
Labels: map[string]string{
"kubeflow.com/kfp": "default-node",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extendPodMetadata(tt.podMetadata, tt.kubernetesExecutorConfig)
assert.Equal(t, tt.expected, tt.podMetadata)
})
}
}
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/argocompiler/dag.go
Expand Up @@ -232,7 +232,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
})
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
case *pipelinespec.PipelineDeploymentConfig_ExecutorSpec_Importer:
Expand Down

0 comments on commit ee376a4

Please sign in to comment.