Skip to content

Commit

Permalink
feat(pipeline-loop v2): Merge loop driver and publisher into the pipe…
Browse files Browse the repository at this point in the history
…lineloop controller logic (#1428)

* add driver to pipelineloop reconcile

* update pipelineloop startup args

* update klog error

* update driver logic

* update image build and fix update logic
  • Loading branch information
Tomcli committed Dec 18, 2023
1 parent 1196489 commit b6fcff9
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 27 deletions.
2 changes: 1 addition & 1 deletion tekton-catalog/pipeline-loops/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu
FROM alpine:3.19
ARG bin_dir=_output/bin
ARG bin_name
ENV BIN ${bin_name}
Expand Down
13 changes: 7 additions & 6 deletions tekton-catalog/pipeline-loops/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ validate-examples: cli $(info validate-examples: validate example yamls for pipe
\( -type f -name "*yaml" \) -print0 | xargs -0 -n1 ${BIN_DIR}/pipelineloop-cli -f

local: update init
go build -o=${BIN_DIR}/pipelineloop-controller ./cmd/controller
go build -o=${BIN_DIR}/pipelineloop-webhook ./cmd/webhook
go build -o=${BIN_DIR}/pipelineloop-cli ./cmd/cli
CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-controller ./cmd/controller
CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-webhook ./cmd/webhook
CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-cli ./cmd/cli

build-linux: update init
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o=${BIN_DIR}/pipelineloop-controller ./cmd/controller
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o=${BIN_DIR}/pipelineloop-webhook ./cmd/webhook
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o=${BIN_DIR}/pipelineloop-cli ./cmd/cli
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-controller ./cmd/controller
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-webhook ./cmd/webhook
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -o=${BIN_DIR}/pipelineloop-cli ./cmd/cli

images: build-linux
docker build --build-arg bin_name=pipelineloop-controller . -t ${DOCKER_REGISTRY}/pipelineloop-controller:$(TAG)
Expand All @@ -63,6 +63,7 @@ update:
go mod download
go mod tidy
go mod vendor
patch -u vendor/k8s.io/klog/v2/klog.go pkg/controller/klog.patch

clean:
rm -r ${BIN_DIR}
Expand Down
2 changes: 1 addition & 1 deletion tekton-catalog/pipeline-loops/config/201-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rules:
resources: ["runs/status", "customruns/status", "taskruns/status", "pipelineruns/status", "runs/finalizers", "customruns/finalizers",]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["custom.tekton.dev"]
resources: ["pipelineloops"]
resources: ["pipelineloops", "kfptasks"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "deployments/finalizers"]
Expand Down
134 changes: 126 additions & 8 deletions tekton-catalog/pipeline-loops/go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,146 @@
module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops

go 1.13
go 1.19

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/google/go-cmp v0.5.9
github.com/cenkalti/backoff/v4 v4.2.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-00010101000000-000000000000
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.24.0
gomodules.xyz/jsonpatch/v2 v2.2.0
go.uber.org/zap v1.26.0
gomodules.xyz/jsonpatch/v2 v2.4.0
k8s.io/api v0.27.1
k8s.io/apimachinery v0.27.1
k8s.io/client-go v0.27.1
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2
k8s.io/utils v0.0.0-20230505201702-9f6742963106
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0
knative.dev/pkg v0.0.0-20231011201526-df28feae6d34
)

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.2 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/IBM/ibm-cos-sdk-go v1.8.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/cel-go v0.12.6 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-containerregistry v0.16.1 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/wire v0.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
gocloud.dev v0.22.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.147.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.4.3 // indirect
gorm.io/driver/sqlite v1.4.2 // indirect
gorm.io/gorm v1.24.0 // indirect
k8s.io/apiextensions-apiserver v0.26.5 // indirect
k8s.io/code-generator v0.26.5 // indirect
k8s.io/gengo v0.0.0-20221011193443-fad74ee6edd9 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
knative.dev/hack v0.0.0-20230417170854-f591fea109b3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace (
github.com/kubeflow/kfp-tekton/tekton-catalog/cache => ../cache/
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore => ../objectstore/
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask => ../tekton-kfptask/
k8s.io/api => k8s.io/api v0.25.9
k8s.io/apimachinery => k8s.io/apimachinery v0.26.5
k8s.io/client-go => k8s.io/client-go v0.25.9
k8s.io/code-generator => k8s.io/code-generator v0.25.9
k8s.io/kubernetes => k8s.io/kubernetes v1.11.1
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.2.9
)
41 changes: 41 additions & 0 deletions tekton-catalog/pipeline-loops/pkg/controller/klog.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
--- klog.go 2023-02-14 13:31:28.209488578 -0800
+++ vendor/k8s.io/klog/v2/klog.go 2023-02-14 13:33:28.081570621 -0800
@@ -401,25 +401,25 @@

// init sets up the defaults and creates command line flags.
func init() {
- commandLine.StringVar(&logging.logDir, "log_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
- commandLine.StringVar(&logging.logFile, "log_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
- commandLine.Uint64Var(&logging.logFileMaxSizeMB, "log_file_max_size", 1800,
+ commandLine.StringVar(&logging.logDir, "klog_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
+ commandLine.StringVar(&logging.logFile, "klog_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
+ commandLine.Uint64Var(&logging.logFileMaxSizeMB, "klog_file_max_size", 1800,
"Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. "+
"If the value is 0, the maximum file size is unlimited.")
- commandLine.BoolVar(&logging.toStderr, "logtostderr", true, "log to standard error instead of files")
- commandLine.BoolVar(&logging.alsoToStderr, "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.toStderr, "klog_tostderr", true, "log to standard error instead of files")
+ commandLine.BoolVar(&logging.alsoToStderr, "klog_alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
logging.setVState(0, nil, false)
- commandLine.Var(&logging.verbosity, "v", "number for the log level verbosity")
- commandLine.BoolVar(&logging.addDirHeader, "add_dir_header", false, "If true, adds the file directory to the header of the log messages")
- commandLine.BoolVar(&logging.skipHeaders, "skip_headers", false, "If true, avoid header prefixes in the log messages")
- commandLine.BoolVar(&logging.oneOutput, "one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
- commandLine.BoolVar(&logging.skipLogHeaders, "skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
+ commandLine.Var(&logging.verbosity, "klog_v", "number for the log level verbosity")
+ commandLine.BoolVar(&logging.addDirHeader, "klog_add_dir_header", false, "If true, adds the file directory to the header of the log messages")
+ commandLine.BoolVar(&logging.skipHeaders, "klog_skip_headers", false, "If true, avoid header prefixes in the log messages")
+ commandLine.BoolVar(&logging.oneOutput, "klog_one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.skipLogHeaders, "klog_skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
logging.stderrThreshold = severityValue{
Severity: severity.ErrorLog, // Default stderrThreshold is ERROR.
}
- commandLine.Var(&logging.stderrThreshold, "stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
- commandLine.Var(&logging.vmodule, "vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
- commandLine.Var(&logging.traceLocation, "log_backtrace_at", "when logging hits line file:N, emit a stack trace")
+ commandLine.Var(&logging.stderrThreshold, "klog_stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
+ commandLine.Var(&logging.vmodule, "klog_vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
+ commandLine.Var(&logging.traceLocation, "klog_backtrace_at", "when logging hits line file:N, emit a stack trace")

logging.settings.contextualLoggingEnabled = true
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -178,6 +179,13 @@ func initLogger(ctx context.Context, kubeClientSet kubernetes.Interface) *zap.Su
return logger
}

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(namespace string) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
Expand All @@ -190,6 +198,7 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c
logger := initLogger(ctx, kubeClientSet)
ctx = logging.WithLogger(ctx, logger)
cacheStore := initCache(ctx, kubeClientSet, params)
runKFPV2Driver := getEnv("KFPV2", "false")
c := &Reconciler{
KubeClientSet: kubeClientSet,
pipelineClientSet: pipelineClientSet,
Expand All @@ -199,6 +208,7 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c
pipelineRunLister: pipelineRunInformer.Lister(),
cacheStore: cacheStore,
clock: clock.RealClock{},
runKFPV2Driver: strings.ToLower(runKFPV2Driver),
}

impl := customRunReconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
Expand Down
Loading

0 comments on commit b6fcff9

Please sign in to comment.