-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(backend): Merge kfp-tekton backend code #10678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
/hold Still working on the tests |
|
|
||
| // TODO: Verify/add support for file:///. | ||
| if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" { | ||
| if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" && ms[1] != "mem://" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mem:// here is to support in memory storage for launcher tests under environments that doesn't have access to public gcs endpoints
| "google.golang.org/grpc/metadata" | ||
|
|
||
| api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client" | ||
| apiv2 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We move report client to use the new v2 api
| numWorkerName = "numWorker" | ||
| clientQPSFlagName = "clientQPS" | ||
| clientBurstFlagName = "clientBurst" | ||
| executionTypeFlagName = "executionType" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new flag is for changing executionType, default is Argo
|
/test kubeflow-pipeline-e2e-test |
1 similar comment
|
/test kubeflow-pipeline-e2e-test |
Tomcli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rimolive for resolving the tests. I don't have access the the backend test, but it seems like only the api integration test is failing.
|
The failure are on jobs, aka scheduled workflow: Not sure how you tested this locally, consider adding debug logs to the test functions? |
Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
|
/unhold @Tomcli @chensun Finally I got the tests passing but unfortunately I needed to follow Tommy's recommentation to remove the scheduled workflow and rethink the kfp-tekton code for that. Feel free to lgtm/approve, but take into consideration that this will be a partial feature. I believe it's not a big deal given that historically we could handle partial feature implementations to make progress. |
Tomcli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @rimolive for fixing some of the tests and dependencies.
/lgtm
For the scheduledworkflow, we probably need to also update the api test to use the new swf v2 client. Let's do that in a follow up PR so it won't block the core KFP feature.
@rimolive Just to confirm, by partial feature you meant scheduledworkflow doesn't work on kfp-tekton yet, but there's no regression to the feature on kfp-argo, right? |
Yes, there's no regression on all the kfp-argo feature. |
| ENV NUM_WORKERS 2 | ||
| ENV LOG_LEVEL info | ||
|
|
||
| CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} --numWorker ${NUM_WORKERS} --logLevel=${LOG_LEVEL} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be an unintentional change? I see the default value remains the same, but we should keep this customization capability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I will rollback that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this line is completely wrong, it should have the EXECUTIONTYPE env var set by default and send as a flag. I'm fixing it.
| // k8s.io/client-go/rest/config.go#RESTClientFor | ||
| flag.Float64Var(&clientQPS, clientQPSFlagName, 5, "The maximum QPS to the master from this client.") | ||
| flag.IntVar(&clientBurst, clientBurstFlagName, 10, "Maximum burst for throttle from this client.") | ||
| flag.StringVar(&executionType, executionTypeFlagName, "Workflow", "Custom Resource's name of the backend Orchestration Engine") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Workflow is too generic as the executionType, should we name the default "ArgoWorkflow" instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code uses the kind field of the pipeline to find its execution type. Changing to ArgoWorkflow would break the code.
We can think of a better way to do this, maybe using annotation. However, I think this is out of scope of this PR.
|
|
||
| } | ||
|
|
||
| // TODO: wait for up stream to officially update to v2beta1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The comment needs some update--there's no upstream once this is merged in to kfp repro.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But v2beta1 is already released, right? If so, we can just remove the comment.
| return nil, util.NewInternalServerError(util.NewInvalidInputError("ScheduledWorkflow doesn't exist: %s", job.K8SName), "Failed to create a run due to invalid name") | ||
| } | ||
| executionSpec.SetOwnerReferences(swf) | ||
| // executionSpec.SetExecutionName(run.Description) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove?
| if executionSpecStr, ok := wfr.Spec.(string); ok { | ||
| return NewPipelineRunFromScheduleWorkflowSpecBytesJSON([]byte(executionSpecStr)) | ||
| } | ||
| // fall back to Argo WorkflowSpec, need to marshal back to json string then unmarshal to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update comment
| assert.Empty(t, execSpec) | ||
| assert.Error(t, err) | ||
| assert.EqualError(t, err, "InternalServerError: type:PipelineRun: ExecutionType is not supported") | ||
| assert.EqualError(t, err, "Invalid input error: not PipelineRun struct") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there should be a better error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It follows the same message for Argo workflow: https://github.com/kubeflow/pipelines/blob/master/backend/src/common/util/workflow.go#L94
| // Get converts this object to a workflowapi.Workflow. | ||
| // func (pr *PipelineRun) Get() *pipelineapi.PipelineRun { | ||
| // return pr.PipelineRun | ||
| // } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
| // if newWF.Spec.ActiveDeadlineSeconds != nil && *newWF.Spec.ActiveDeadlineSeconds == 0 { | ||
| // // if it was terminated, unset the deadline | ||
| // newWF.Spec.ActiveDeadlineSeconds = nil | ||
| // } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove all the dead code.
| func readTaskRunMetricsJSONOrEmpty( | ||
| runID string, nodeStatus pipelineapi.PipelineRunTaskRunStatus, | ||
| retrieveArtifact RetrieveArtifact) (string, error) { | ||
| // Tekton doesn't support any artifact spec, artifact records are done by our custom metadata writers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: remove dead code
Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
|
@rimolive: The following test failed, say
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
chensun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Thanks!
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chensun, Tomcli The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Description of your changes:
Closes #10488
Merge all the kfp-tekton compiler code to pipelines.
Checklist: