This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
workflow.go
82 lines (75 loc) · 2.61 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package transformers
import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"
)
// Transforms a WorkflowCreateRequest to a workflow model
func CreateWorkflowModel(request admin.WorkflowCreateRequest, remoteClosureIdentifier string,
digest []byte) (models.Workflow, error) {
var typedInterface []byte
if request.Spec != nil && request.Spec.Template != nil && request.Spec.Template.Interface != nil {
serializedTypedInterface, err := proto.Marshal(request.Spec.Template.Interface)
if err != nil {
return models.Workflow{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize workflow spec")
}
typedInterface = serializedTypedInterface
}
return models.Workflow{
WorkflowKey: models.WorkflowKey{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Id.Name,
Version: request.Id.Version,
},
TypedInterface: typedInterface,
RemoteClosureIdentifier: remoteClosureIdentifier,
Digest: digest,
}, nil
}
func FromWorkflowModel(workflowModel models.Workflow) (admin.Workflow, error) {
id := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: workflowModel.WorkflowKey.Project,
Domain: workflowModel.WorkflowKey.Domain,
Name: workflowModel.WorkflowKey.Name,
Version: workflowModel.WorkflowKey.Version,
}
createdAt, err := ptypes.TimestampProto(workflowModel.CreatedAt)
if err != nil {
return admin.Workflow{}, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read created at timestamp")
}
// Because the spec if offloaded, it is not populated in the model returned here.
return admin.Workflow{
Id: &id,
Closure: &admin.WorkflowClosure{
CreatedAt: createdAt,
},
}, nil
}
func FromWorkflowModels(workflowModels []models.Workflow) ([]*admin.Workflow, error) {
workflows := make([]*admin.Workflow, len(workflowModels))
for idx, workflowModel := range workflowModels {
workflow, err := FromWorkflowModel(workflowModel)
if err != nil {
return nil, err
}
workflows[idx] = &workflow
}
return workflows, nil
}
func FromWorkflowModelsToIdentifiers(workflowModels []models.Workflow) []*admin.NamedEntityIdentifier {
ids := make([]*admin.NamedEntityIdentifier, len(workflowModels))
for i, wf := range workflowModels {
ids[i] = &admin.NamedEntityIdentifier{
Project: wf.Project,
Domain: wf.Domain,
Name: wf.Name,
}
}
return ids
}