This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathworkflow_validator.go
63 lines (56 loc) · 2.44 KB
/
workflow_validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package validation
import (
"context"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"google.golang.org/grpc/codes"
)
const numSystemNodes = 2 // A workflow graph always has a start and end node injected by the platform.
func ValidateWorkflow(
ctx context.Context, request admin.WorkflowCreateRequest, db repositories.RepositoryInterface,
config runtime.ApplicationConfiguration) error {
if err := ValidateIdentifier(request.Id, common.Workflow); err != nil {
return err
}
if err := ValidateProjectAndDomain(ctx, db, config, request.Id.Project, request.Id.Domain); err != nil {
return err
}
if request.Spec == nil || request.Spec.Template == nil {
return shared.GetMissingArgumentError(shared.Spec)
}
return nil
}
func ValidateCompiledWorkflow(identifier core.Identifier, workflow admin.WorkflowClosure, config runtime.RegistrationValidationConfiguration) error {
if len(config.GetWorkflowSizeLimit()) > 0 {
workflowSizeLimit := resource.MustParse(config.GetWorkflowSizeLimit())
workflowSizeValue := resource.NewQuantity(int64(proto.Size(&workflow)), resource.DecimalExponent)
if workflowSizeLimit.Cmp(*workflowSizeValue) <= -1 {
return errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Workflow closure size exceeds max limit [%v]", config.GetWorkflowSizeLimit())
}
}
if config.GetWorkflowNodeLimit() == 0 {
// Treat this is unset. There is no limit to compare against.
return nil
}
if workflow.CompiledWorkflow == nil || workflow.CompiledWorkflow.Primary == nil ||
workflow.CompiledWorkflow.Primary.Template == nil || workflow.CompiledWorkflow.Primary.Template.Nodes == nil {
logger.Warningf(context.Background(), "workflow [%+v] did not have any primary nodes", identifier)
return nil
}
numUserNodes := len(workflow.CompiledWorkflow.Primary.Template.Nodes) - numSystemNodes
if numUserNodes > config.GetWorkflowNodeLimit() {
return errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"number of nodes in workflow [%+v] exceeds limit (%v > %v)", identifier,
numUserNodes, config.GetWorkflowNodeLimit())
}
return nil
}