Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Backend + SDK): Update kfp backend and kubernetes sdk to support pod labels and annotations #10393

Merged
merged 11 commits into from Feb 9, 2024
5 changes: 5 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Expand Up @@ -52,6 +52,11 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "../testdata/create_mount_delete_dynamic_pvc_platform.json",
argoYAMLPath: "testdata/create_mount_delete_dynamic_pvc.yaml",
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "../testdata/create_pod_metadata.json",
argoYAMLPath: "testdata/create_pod_metadata.yaml",
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down
84 changes: 67 additions & 17 deletions backend/src/v2/compiler/argocompiler/container.go
Expand Up @@ -15,19 +15,22 @@
package argocompiler

import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/component"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
k8score "k8s.io/api/core/v1"
)

const (
volumeNameKFPLauncher = "kfp-launcher"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
DefaultLauncherImage = "gcr.io/ml-pipeline/kfp-launcher@sha256:80cf120abd125db84fa547640fd6386c4b2a26936e0c2b04a7d3634991a850a4"
LauncherImageEnvVar = "V2_LAUNCHER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:8e60086b04d92b657898a310ca9757631d58547e76bbbb8bfc376d654bef1707"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
)

func (c *workflowCompiler) Container(name string, component *pipelinespec.ComponentSpec, container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec) error {
Expand Down Expand Up @@ -58,19 +61,19 @@ type containerDriverInputs struct {
}

func GetLauncherImage() string {
launcherImage := os.Getenv(LauncherImageEnvVar)
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
launcherImage := os.Getenv(LauncherImageEnvVar)
Tomcli marked this conversation as resolved.
Show resolved Hide resolved
if launcherImage == "" {
launcherImage = DefaultLauncherImage
}
return launcherImage
}

func GetDriverImage() string {
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
driverImage := os.Getenv(DriverImageEnvVar)
if driverImage == "" {
driverImage = DefaultDriverImage
}
return driverImage
}

func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
Expand Down Expand Up @@ -169,14 +172,14 @@ type containerExecutorInputs struct {
// name: argo workflows DAG task name
// The other arguments are argo workflows task parameters, they can be either a
// string or a placeholder.
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs) *wfapi.DAGTask {
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs, refName string) *wfapi.DAGTask {
when := ""
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(),
Template: c.addContainerExecutorTemplate(refName),
When: when,
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{
Expand All @@ -191,7 +194,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
// any container component task.
// During runtime, it's expected that pod-spec-patch will specify command, args
// and resources etc, that are different for different tasks.
func (c *workflowCompiler) addContainerExecutorTemplate() string {
func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
// container template is parent of container implementation template
nameContainerExecutor := "system-container-executor"
nameContainerImpl := "system-container-impl"
Expand Down Expand Up @@ -273,7 +276,54 @@ func (c *workflowCompiler) addContainerExecutorTemplate() string {
Env: commonEnvs,
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
c.templates[nameContainerImpl] = executor
c.wf.Spec.Templates = append(c.wf.Spec.Templates, *container, *executor)
return nameContainerExecutor
}

// Extends the PodMetadata to include Kubernetes-specific executor config.
// Although the current podMetadata object is always empty, this function
// doesn't overwrite the existing podMetadata because for security reasons
// the existing podMetadata should have higher privilege than the user definition.
func extendPodMetadata(
podMetadata *wfapi.Metadata,
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
) {
// Get pod metadata information
if kubernetesExecutorConfig.GetPodMetadata() != nil {
if kubernetesExecutorConfig.GetPodMetadata().GetLabels() != nil {
Tomcli marked this conversation as resolved.
Show resolved Hide resolved
if podMetadata.Labels == nil {
podMetadata.Labels = kubernetesExecutorConfig.GetPodMetadata().GetLabels()
} else {
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, kubernetesExecutorConfig.GetPodMetadata().GetLabels())
}
}
if kubernetesExecutorConfig.GetPodMetadata().GetAnnotations() != nil {
if podMetadata.Annotations == nil {
podMetadata.Annotations = kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
} else {
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, kubernetesExecutorConfig.GetPodMetadata().GetAnnotations())
}
}
}
}

// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
// The original Map inputs should have higher priority since its defined by admin
// TODO: Use maps.Copy after moving to go 1.21+
func extendMetadataMap(
highPriorityMap map[string]string,
lowPriorityMap map[string]string,
) map[string]string {
for k, v := range highPriorityMap {
lowPriorityMap[k] = v
}
return lowPriorityMap
}
61 changes: 61 additions & 0 deletions backend/src/v2/compiler/argocompiler/container_test.go
@@ -0,0 +1,61 @@
// Copyright 2021-2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package argocompiler

import (
"testing"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/stretchr/testify/assert"
)

func Test_extendPodMetadata(t *testing.T) {
Tomcli marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
podMetadata *wfapi.Metadata
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig
expected *wfapi.Metadata
}{
{
"Valid - add pod labels and annotations",
&wfapi.Metadata{},
&kubernetesplatform.KubernetesExecutorConfig{
PodMetadata: &kubernetesplatform.PodMetadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
&wfapi.Metadata{
Annotations: map[string]string{
"run_id": "123456",
},
Labels: map[string]string{
"kubeflow.com/kfp": "pipeline-node",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extendPodMetadata(tt.podMetadata, tt.kubernetesExecutorConfig)
assert.Equal(t, tt.expected, tt.podMetadata)
})
}
}
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/argocompiler/dag.go
Expand Up @@ -232,7 +232,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
})
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
case *pipelinespec.PipelineDeploymentConfig_ExecutorSpec_Importer:
Expand Down