Skip to content

Commit

Permalink
Merge branch 'master' into fieldpath-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomcli committed Feb 28, 2024
2 parents 058e22f + c051e55 commit 489ce7f
Show file tree
Hide file tree
Showing 106 changed files with 34,777 additions and 1,864 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -56,7 +56,7 @@ The meeting is happening every other Wed 10-11AM (PST)
* [Part 1: How to create and deploy a Kubeflow Machine Learning Pipeline](https://towardsdatascience.com/how-to-create-and-deploy-a-kubeflow-machine-learning-pipeline-part-1-efea7a4b650f)
* [Part 2: How to deploy Jupyter notebooks as components of a Kubeflow ML pipeline](https://towardsdatascience.com/how-to-deploy-jupyter-notebooks-as-components-of-a-kubeflow-ml-pipeline-part-2-b1df77f4e5b3)
* [Part 3: How to carry out CI/CD in Machine Learning (“MLOps”) using Kubeflow ML pipelines](https://medium.com/google-cloud/how-to-carry-out-ci-cd-in-machine-learning-mlops-using-kubeflow-ml-pipelines-part-3-bdaf68082112)
* [Kubeflow Pipelines meets Tekton](https://developer.ibm.com/blogs/kubeflow-pipelines-with-tekton-and-watson/) (By Animesh Singh)
* [Tekton optimizations for Kubeflow Pipelines 2.0](https://developer.ibm.com/blogs/awb-tekton-optimizations-for-kubeflow-pipelines-2-0) (By Tommy Li)
## Acknowledgments

Kubeflow pipelines uses [Argo Workflows](https://github.com/argoproj/argo-workflows) by default under the hood to orchestrate Kubernetes resources. The Argo community has been very supportive and we are very grateful. Additionally there is Tekton backend available as well. To access it, please refer to [Kubeflow Pipelines with Tekton repository](https://github.com/kubeflow/kfp-tekton).
3 changes: 1 addition & 2 deletions backend/OWNERS
@@ -1,7 +1,6 @@
approvers:
- chensun
- gkcalat
- Tomcli
reviewers:
- chensun
- gkcalat
- Tomcli
9 changes: 9 additions & 0 deletions backend/src/v2/cmd/driver/execution_paths.go
@@ -0,0 +1,9 @@
package main

type ExecutionPaths struct {
ExecutionID string
IterationCount string
CachedDecision string
Condition string
PodSpecPatch string
}
48 changes: 38 additions & 10 deletions backend/src/v2/cmd/driver/main.go
Expand Up @@ -37,6 +37,9 @@ import (

const (
driverTypeArg = "type"
ROOT_DAG = "ROOT_DAG"
DAG = "DAG"
CONTAINER = "CONTAINER"
)

var (
Expand Down Expand Up @@ -160,12 +163,12 @@ func drive() (err error) {
var execution *driver.Execution
var driverErr error
switch *driverType {
case "ROOT_DAG":
case ROOT_DAG:
options.RuntimeConfig = runtimeConfig
execution, driverErr = driver.RootDAG(ctx, options, client)
case "DAG":
case DAG:
execution, driverErr = driver.DAG(ctx, options, client)
case "CONTAINER":
case CONTAINER:
options.Container = containerSpec
options.KubernetesExecutorConfig = k8sExecCfg
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
Expand All @@ -183,35 +186,60 @@ func drive() (err error) {
err = driverErr
}()
}

executionPaths := &ExecutionPaths{
ExecutionID: *executionIDPath,
IterationCount: *iterationCountPath,
CachedDecision: *cachedDecisionPath,
Condition: *conditionPath,
PodSpecPatch: *podSpecPatchPath}

return handleExecution(execution, *driverType, executionPaths)
}

func handleExecution(execution *driver.Execution, driverType string, executionPaths *ExecutionPaths) error {
if execution.ID != 0 {
glog.Infof("output execution.ID=%v", execution.ID)
if *executionIDPath != "" {
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
if executionPaths.ExecutionID != "" {
if err := writeFile(executionPaths.ExecutionID, []byte(fmt.Sprint(execution.ID))); err != nil {
return fmt.Errorf("failed to write execution ID to file: %w", err)
}
}
}
if execution.IterationCount != nil {
if err = writeFile(*iterationCountPath, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
if err := writeFile(executionPaths.IterationCount, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
return fmt.Errorf("failed to write iteration count to file: %w", err)
}
} else {
if driverType == ROOT_DAG {
if err := writeFile(executionPaths.IterationCount, []byte("0")); err != nil {
return fmt.Errorf("failed to write iteration count to file: %w", err)
}
}
}
if execution.Cached != nil {
if err = writeFile(*cachedDecisionPath, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
if err := writeFile(executionPaths.CachedDecision, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
return fmt.Errorf("failed to write cached decision to file: %w", err)
}
}
if execution.Condition != nil {
if err = writeFile(*conditionPath, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
if err := writeFile(executionPaths.Condition, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
return fmt.Errorf("failed to write condition to file: %w", err)
}
} else {
// nil is a valid value for Condition
if driverType == ROOT_DAG || driverType == CONTAINER {
if err := writeFile(executionPaths.Condition, []byte("nil")); err != nil {
return fmt.Errorf("failed to write condition to file: %w", err)
}
}
}
if execution.PodSpecPatch != "" {
glog.Infof("output podSpecPatch=\n%s\n", execution.PodSpecPatch)
if *podSpecPatchPath == "" {
if executionPaths.PodSpecPatch == "" {
return fmt.Errorf("--pod_spec_patch_path is required for container executor drivers")
}
if err = writeFile(*podSpecPatchPath, []byte(execution.PodSpecPatch)); err != nil {
if err := writeFile(executionPaths.PodSpecPatch, []byte(execution.PodSpecPatch)); err != nil {
return fmt.Errorf("failed to write pod spec patch to file: %w", err)
}
}
Expand Down
79 changes: 79 additions & 0 deletions backend/src/v2/cmd/driver/main_test.go
@@ -0,0 +1,79 @@
package main

import (
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"os"
"testing"
)

func Test_handleExecutionContainer(t *testing.T) {
execution := &driver.Execution{}

executionPaths := &ExecutionPaths{
Condition: "condition.txt",
}

err := handleExecution(execution, CONTAINER, executionPaths)

if err != nil {
t.Errorf("Unexpected error: %v", err)
}

verifyFileContent(t, executionPaths.Condition, "nil")

cleanup(t, executionPaths)
}

func Test_handleExecutionRootDAG(t *testing.T) {
execution := &driver.Execution{}

executionPaths := &ExecutionPaths{
IterationCount: "iteration_count.txt",
Condition: "condition.txt",
}

err := handleExecution(execution, ROOT_DAG, executionPaths)

if err != nil {
t.Errorf("Unexpected error: %v", err)
}

verifyFileContent(t, executionPaths.IterationCount, "0")
verifyFileContent(t, executionPaths.Condition, "nil")

cleanup(t, executionPaths)
}

func cleanup(t *testing.T, executionPaths *ExecutionPaths) {
removeIfExists(t, executionPaths.IterationCount)
removeIfExists(t, executionPaths.ExecutionID)
removeIfExists(t, executionPaths.Condition)
removeIfExists(t, executionPaths.PodSpecPatch)
removeIfExists(t, executionPaths.CachedDecision)
}

func removeIfExists(t *testing.T, filePath string) {
_, err := os.Stat(filePath)
if err == nil {
err = os.Remove(filePath)
if err != nil {
t.Errorf("Unexpected error while removing the created file: %v", err)
}
}
}

func verifyFileContent(t *testing.T, filePath string, expectedContent string) {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
t.Errorf("Expected file %s to be created, but it doesn't exist", filePath)
}

fileContent, err := os.ReadFile(filePath)
if err != nil {
t.Errorf("Failed to read file contents: %v", err)
}

if string(fileContent) != expectedContent {
t.Errorf("Expected file fileContent to be %q, got %q", expectedContent, string(fileContent))
}
}
55 changes: 55 additions & 0 deletions backend/src/v2/driver/driver.go
Expand Up @@ -480,6 +480,28 @@ func extendPodSpecPatch(
podSpec.NodeSelector = kubernetesExecutorConfig.GetNodeSelector().GetLabels()
}

if tolerations := kubernetesExecutorConfig.GetTolerations(); tolerations != nil {
var k8sTolerations []k8score.Toleration

glog.Infof("Tolerations passed: %+v", tolerations)

for _, toleration := range tolerations {
if toleration != nil {
k8sToleration := k8score.Toleration{
Key: toleration.Key,
Operator: k8score.TolerationOperator(toleration.Operator),
Value: toleration.Value,
Effect: k8score.TaintEffect(toleration.Effect),
TolerationSeconds: toleration.TolerationSeconds,
}

k8sTolerations = append(k8sTolerations, k8sToleration)
}
}

podSpec.Tolerations = k8sTolerations
}

// Get secret mount information
for _, secretAsVolume := range kubernetesExecutorConfig.GetSecretAsVolume() {
secretVolume := k8score.Volume{
Expand Down Expand Up @@ -512,6 +534,39 @@ func extendPodSpecPatch(
}
}

// Get config map mount information
for _, configMapAsVolume := range kubernetesExecutorConfig.GetConfigMapAsVolume() {
configMapVolume := k8score.Volume{
Name: configMapAsVolume.GetConfigMapName(),
VolumeSource: k8score.VolumeSource{
ConfigMap: &k8score.ConfigMapVolumeSource{
LocalObjectReference: k8score.LocalObjectReference{Name: configMapAsVolume.GetConfigMapName()}},
},
}
configMapVolumeMount := k8score.VolumeMount{
Name: configMapAsVolume.GetConfigMapName(),
MountPath: configMapAsVolume.GetMountPath(),
}
podSpec.Volumes = append(podSpec.Volumes, configMapVolume)
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, configMapVolumeMount)
}

// Get config map env information
for _, configMapAsEnv := range kubernetesExecutorConfig.GetConfigMapAsEnv() {
for _, keyToEnv := range configMapAsEnv.GetKeyToEnv() {
configMapEnvVar := k8score.EnvVar{
Name: keyToEnv.GetEnvVar(),
ValueFrom: &k8score.EnvVarSource{
ConfigMapKeyRef: &k8score.ConfigMapKeySelector{
Key: keyToEnv.GetConfigMapKey(),
},
},
}
configMapEnvVar.ValueFrom.ConfigMapKeyRef.LocalObjectReference.Name = configMapAsEnv.GetConfigMapName()
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, configMapEnvVar)
}
}

// Get image pull secret information
for _, imagePullSecret := range kubernetesExecutorConfig.GetImagePullSecret() {
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, k8score.LocalObjectReference{Name: imagePullSecret.GetSecretName()})
Expand Down

0 comments on commit 489ce7f

Please sign in to comment.