This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathlaunch_plan.go
133 lines (120 loc) · 3.99 KB
/
launch_plan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package transformers
import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"
)
func CreateLaunchPlan(
request admin.LaunchPlanCreateRequest,
expectedOutputs *core.VariableMap) admin.LaunchPlan {
return admin.LaunchPlan{
Id: request.Id,
Spec: request.Spec,
Closure: &admin.LaunchPlanClosure{
ExpectedInputs: request.Spec.DefaultInputs,
ExpectedOutputs: expectedOutputs,
},
}
}
// Transforms a admin.LaunchPlan object to a LaunchPlan model
func CreateLaunchPlanModel(
launchPlan admin.LaunchPlan,
workflowRepoID uint,
digest []byte,
initState admin.LaunchPlanState) (models.LaunchPlan, error) {
spec, err := proto.Marshal(launchPlan.Spec)
if err != nil {
return models.LaunchPlan{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan spec")
}
closure, err := proto.Marshal(launchPlan.Closure)
if err != nil {
return models.LaunchPlan{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan closure")
}
scheduleType := models.LaunchPlanScheduleTypeNONE
if launchPlan.Spec.EntityMetadata != nil && launchPlan.Spec.EntityMetadata.Schedule != nil {
if launchPlan.Spec.EntityMetadata.Schedule.GetCronExpression() != "" {
scheduleType = models.LaunchPlanScheduleTypeCRON
} else if launchPlan.Spec.EntityMetadata.Schedule.GetRate() != nil {
scheduleType = models.LaunchPlanScheduleTypeRATE
}
}
state := int32(initState)
return models.LaunchPlan{
LaunchPlanKey: models.LaunchPlanKey{
Project: launchPlan.Id.Project,
Domain: launchPlan.Id.Domain,
Name: launchPlan.Id.Name,
Version: launchPlan.Id.Version,
},
Spec: spec,
State: &state,
Closure: closure,
WorkflowID: workflowRepoID,
Digest: digest,
ScheduleType: scheduleType,
}, nil
}
// Transforms a LaunchPlanModel to a LaunchPlan
func FromLaunchPlanModel(model models.LaunchPlan) (*admin.LaunchPlan, error) {
spec := &admin.LaunchPlanSpec{}
err := proto.Unmarshal(model.Spec, spec)
if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "failed to unmarshal spec")
}
var closure admin.LaunchPlanClosure
err = proto.Unmarshal(model.Closure, &closure)
if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "failed to unmarshal closure")
}
createdAt, err := ptypes.TimestampProto(model.CreatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read created at")
}
updatedAt, err := ptypes.TimestampProto(model.UpdatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read updated at")
}
if model.State != nil {
closure.State = admin.LaunchPlanState(*model.State)
}
closure.CreatedAt = createdAt
closure.UpdatedAt = updatedAt
id := core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: model.Project,
Domain: model.Domain,
Name: model.Name,
Version: model.Version,
}
return &admin.LaunchPlan{
Id: &id,
Spec: spec,
Closure: &closure,
}, nil
}
func FromLaunchPlanModels(launchPlanModels []models.LaunchPlan) ([]*admin.LaunchPlan, error) {
launchPlans := make([]*admin.LaunchPlan, len(launchPlanModels))
for idx, launchPlanModel := range launchPlanModels {
launchPlan, err := FromLaunchPlanModel(launchPlanModel)
if err != nil {
return nil, err
}
launchPlans[idx] = launchPlan
}
return launchPlans, nil
}
func FromLaunchPlanModelsToIdentifiers(launchPlanModels []models.LaunchPlan) []*admin.NamedEntityIdentifier {
ids := make([]*admin.NamedEntityIdentifier, len(launchPlanModels))
for i, launchPlanModel := range launchPlanModels {
ids[i] = &admin.NamedEntityIdentifier{
Project: launchPlanModel.Project,
Domain: launchPlanModel.Domain,
Name: launchPlanModel.Name,
}
}
return ids
}