Skip to content

Commit

Permalink
Merge branch 'master' into jmcarp/api-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Oct 28, 2021
2 parents 2fdfb61 + ec4ab2d commit 76fcebf
Show file tree
Hide file tree
Showing 26 changed files with 317 additions and 135 deletions.
66 changes: 66 additions & 0 deletions backend/src/apiserver/server/test/v2-hello-world.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"components": {
"comp-hello-world": {
"executorLabel": "exec-hello-world",
"inputDefinitions": {
"parameters": {
"text": {
"type": "STRING"
}
}
}
}
},
"deploymentSpec": {
"executors": {
"exec-hello-world": {
"container": {
"args": ["--text", "{{$.inputs.parameters['text']}}"],
"command": [
"sh",
"-ec",
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Hello world', description='')\n_parser.add_argument(\"--text\", dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"
],
"image": "python:3.7"
}
}
}
},
"pipelineInfo": {
"name": "hello-world"
},
"root": {
"dag": {
"tasks": {
"hello-world": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-hello-world"
},
"inputs": {
"parameters": {
"text": {
"componentInputParameter": "text"
}
}
},
"taskInfo": {
"name": "hello-world"
}
}
}
},
"inputDefinitions": {
"parameters": {
"text": {
"type": "STRING"
}
}
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.6.5"
}
12 changes: 7 additions & 5 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func loadFile(fileReader io.Reader, maxFileLength int) ([]byte, error) {
return pipelineFile[:size], nil
}

func isSupportedPipelineFormat(fileName string, compressedFile []byte) bool {
return isYamlFile(fileName) || isCompressedTarballFile(compressedFile) || isZipFile(compressedFile)
}

func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isJSONFile(fileName string) bool {
return strings.HasSuffix(fileName, ".json")
}

func isPipelineYamlFile(fileName string) bool {
return fileName == "pipeline.yaml"
}
Expand Down Expand Up @@ -168,12 +168,14 @@ func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int)
switch {
case isYamlFile(fileName):
processedFile = pipelineFileBytes
case isJSONFile(fileName):
processedFile = pipelineFileBytes
case isZipFile(pipelineFileBytes):
processedFile, err = DecompressPipelineZip(pipelineFileBytes)
case isCompressedTarballFile(pipelineFileBytes):
processedFile, err = DecompressPipelineTarball(pipelineFileBytes)
default:
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .zip, .tar.gz or YAML.")
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .zip, .tar.gz, .json or YAML.")
}
if err != nil {
return nil, util.Wrap(err, "Error decompress the pipeline file")
Expand Down
9 changes: 9 additions & 0 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ func TestReadPipelineFile_YAML(t *testing.T) {
assert.Equal(t, expectedFileBytes, fileBytes)
}

func TestReadPipelineFile_JSON(t *testing.T) {
file, _ := os.Open("test/v2-hello-world.json")
fileBytes, err := ReadPipelineFile("v2-hello-world.json", file, MaxFileLength)
assert.Nil(t, err)

expectedFileBytes, _ := ioutil.ReadFile("test/v2-hello-world.json")
assert.Equal(t, expectedFileBytes, fileBytes)
}

func TestReadPipelineFile_Zip(t *testing.T) {
file, _ := os.Open("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := ReadPipelineFile("arguments-parameters.zip", file, MaxFileLength)
Expand Down
31 changes: 5 additions & 26 deletions backend/src/common/client/api_server/pipeline_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package api_server
import (
"fmt"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
"github.com/go-openapi/strfmt"
apiclient "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client"
params "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service"
Expand All @@ -19,7 +17,7 @@ type PipelineInterface interface {
Create(params *params.CreatePipelineParams) (*model.APIPipeline, error)
Get(params *params.GetPipelineParams) (*model.APIPipeline, error)
Delete(params *params.DeletePipelineParams) error
GetTemplate(params *params.GetTemplateParams) (*workflowapi.Workflow, error)
GetTemplate(params *params.GetTemplateParams) (util.Template, error)
List(params *params.ListPipelinesParams) ([]*model.APIPipeline, int, string, error)
ListAll(params *params.ListPipelinesParams, maxResultSize int) (
[]*model.APIPipeline, error)
Expand Down Expand Up @@ -138,8 +136,7 @@ func (c *PipelineClient) Delete(parameters *params.DeletePipelineParams) error {
return nil
}

func (c *PipelineClient) GetTemplate(parameters *params.GetTemplateParams) (
*workflowapi.Workflow, error) {
func (c *PipelineClient) GetTemplate(parameters *params.GetTemplateParams) (util.Template, error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()
Expand All @@ -160,16 +157,7 @@ func (c *PipelineClient) GetTemplate(parameters *params.GetTemplateParams) (
}

// Unmarshal response
var workflow workflowapi.Workflow
err = yaml.Unmarshal([]byte(response.Payload.Template), &workflow)
if err != nil {
return nil, util.NewUserError(err,
fmt.Sprintf("Failed to unmarshal reponse. Params: '%+v'. Response: '%s'", parameters,
response.Payload.Template),
fmt.Sprintf("Failed to unmarshal reponse"))
}

return &workflow, nil
return util.NewTemplate([]byte(response.Payload.Template))
}

func (c *PipelineClient) List(parameters *params.ListPipelinesParams) (
Expand Down Expand Up @@ -298,7 +286,7 @@ func (c *PipelineClient) GetPipelineVersion(parameters *params.GetPipelineVersio
}

func (c *PipelineClient) GetPipelineVersionTemplate(parameters *params.GetPipelineVersionTemplateParams) (
*workflowapi.Workflow, error) {
util.Template, error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()
Expand All @@ -319,14 +307,5 @@ func (c *PipelineClient) GetPipelineVersionTemplate(parameters *params.GetPipeli
}

// Unmarshal response
var workflow workflowapi.Workflow
err = yaml.Unmarshal([]byte(response.Payload.Template), &workflow)
if err != nil {
return nil, util.NewUserError(err,
fmt.Sprintf("Failed to unmarshal reponse. Params: '%+v'. Response: '%s'", parameters,
response.Payload.Template),
fmt.Sprintf("Failed to unmarshal reponse"))
}

return &workflow, nil
return util.NewTemplate([]byte(response.Payload.Template))
}
23 changes: 14 additions & 9 deletions backend/src/common/client/api_server/pipeline_client_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"path"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
"github.com/go-openapi/strfmt"
params "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service"
pipelineparams "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service"
pipelinemodel "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_model"
"github.com/kubeflow/pipelines/backend/src/common/util"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -42,13 +42,18 @@ func getDefaultWorkflow() *workflowapi.Workflow {
}}
}

func getDefaultTemplate() util.Template {
tmpl, _ := util.NewArgoTemplateFromWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
}})
return tmpl
}

func getDefaultWorkflowAsString() string {
workflow := getDefaultWorkflow()
result, err := yaml.Marshal(workflow)
if err != nil {
return "no workflow"
}
return string(result)
tmpl := getDefaultTemplate()
return string(tmpl.Bytes())
}

type PipelineClientFake struct{}
Expand Down Expand Up @@ -87,12 +92,12 @@ func (c *PipelineClientFake) Delete(params *pipelineparams.DeletePipelineParams)
}

func (c *PipelineClientFake) GetTemplate(params *pipelineparams.GetTemplateParams) (
*workflowapi.Workflow, error) {
util.Template, error) {
switch params.ID {
case PipelineForClientErrorTest:
return nil, fmt.Errorf(ClientErrorString)
default:
return getDefaultWorkflow(), nil
return getDefaultTemplate(), nil
}
}

Expand Down
5 changes: 5 additions & 0 deletions backend/src/common/util/template_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
"github.com/ghodss/yaml"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand Down Expand Up @@ -164,6 +165,10 @@ func NewArgoTemplate(bytes []byte) (*ArgoTemplate, error) {
return &ArgoTemplate{wf}, nil
}

func NewArgoTemplateFromWorkflow(wf *workflowapi.Workflow) (*ArgoTemplate, error) {
return &ArgoTemplate{wf: &Workflow{wf}}, nil
}

func (t *ArgoTemplate) Bytes() []byte {
if t == nil {
return nil
Expand Down
Loading

0 comments on commit 76fcebf

Please sign in to comment.