This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
fail_fast.go
68 lines (55 loc) · 2.11 KB
/
fail_fast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package bundle
import (
"context"
"fmt"
"time"
"github.com/lyft/flyteplugins/go/tasks/errors"
pluginMachinery "github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
)
const failFastExecutorName = "fail-fast"
type failFastHandler struct{}
func (h failFastHandler) GetID() string {
return failFastExecutorName
}
func (h failFastHandler) GetProperties() core.PluginProperties {
return core.PluginProperties{}
}
func (h failFastHandler) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
occuredAt := time.Now()
taskTemplate, err := tCtx.TaskReader().Read(ctx)
if err != nil {
return core.UnknownTransition,
errors.Errorf(errors.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
}
return core.DoTransition(core.PhaseInfoFailure("AlwaysFail",
fmt.Sprintf("Task [%s] type [%+v] not supported by platform for this project/domain/workflow",
taskTemplate.Type, tCtx.TaskExecutionMetadata().GetTaskExecutionID()), &core.TaskInfo{
OccurredAt: &occuredAt,
})), nil
}
func (h failFastHandler) Abort(_ context.Context, _ core.TaskExecutionContext) error {
return nil
}
func (h failFastHandler) Finalize(_ context.Context, _ core.TaskExecutionContext) error {
return nil
}
func failFastPluginLoader(_ context.Context, _ core.SetupContext) (core.Plugin, error) {
return &failFastHandler{}, nil
}
func init() {
// TODO(katrogan): Once we move pluginmachinery to flyteidl make these task types named constants that flyteplugins
// can reference in other handler definitions.
// NOTE: these should match the constants defined flytekit
taskTypes := []core.TaskType{
"container", "sidecar", "container_array", "hive", "presto", "spark", "pytorch",
"sagemaker_custom_training_job_task", "sagemaker_training_job_task", "sagemaker_hyperparameter_tuning_job_task",
}
pluginMachinery.PluginRegistry().RegisterCorePlugin(
core.PluginEntry{
ID: failFastExecutorName,
RegisteredTaskTypes: taskTypes,
LoadPlugin: failFastPluginLoader,
IsDefault: false,
})
}