This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
container_helper.go
executable file
·134 lines (110 loc) · 4.99 KB
/
container_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package flytek8s
import (
"context"
"regexp"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/rand"
"github.com/lyft/flyteplugins/go/tasks/errors"
pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/utils"
)
var isAcceptableK8sName, _ = regexp.Compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?")
const resourceGPU = "gpu"
// ResourceNvidiaGPU is the name of the Nvidia GPU resource.
// Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go
const ResourceNvidiaGPU = "nvidia.com/gpu"
func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements {
// set memory and cpu to default if not provided by user.
if len(resources.Requests) == 0 {
resources.Requests = make(v1.ResourceList)
}
if len(resources.Limits) == 0 {
resources.Limits = make(v1.ResourceList)
}
if _, found := resources.Requests[v1.ResourceCPU]; !found {
// use cpu limit if set else default to config
if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet {
resources.Requests[v1.ResourceCPU] = resources.Limits[v1.ResourceCPU]
} else {
resources.Requests[v1.ResourceCPU] = resource.MustParse(config.GetK8sPluginConfig().DefaultCPURequest)
}
}
if _, found := resources.Requests[v1.ResourceMemory]; !found {
// use memory limit if set else default to config
if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet {
resources.Requests[v1.ResourceMemory] = resources.Limits[v1.ResourceMemory]
} else {
resources.Requests[v1.ResourceMemory] = resource.MustParse(config.GetK8sPluginConfig().DefaultMemoryRequest)
}
}
if _, found := resources.Limits[v1.ResourceCPU]; !found {
logger.Infof(ctx, "found cpu limit missing, setting limit to the requested value %v", resources.Requests[v1.ResourceCPU])
resources.Limits[v1.ResourceCPU] = resources.Requests[v1.ResourceCPU]
}
if _, found := resources.Limits[v1.ResourceMemory]; !found {
logger.Infof(ctx, "found memory limit missing, setting limit to the requested value %v", resources.Requests[v1.ResourceMemory])
resources.Limits[v1.ResourceMemory] = resources.Requests[v1.ResourceMemory]
}
// TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits.
// https://github.com/kubernetes/enhancements/issues/362
delete(resources.Requests, v1.ResourceStorage)
delete(resources.Requests, v1.ResourceEphemeralStorage)
delete(resources.Limits, v1.ResourceStorage)
delete(resources.Limits, v1.ResourceEphemeralStorage)
// Override GPU
if res, found := resources.Requests[resourceGPU]; found {
resources.Requests[ResourceNvidiaGPU] = res
delete(resources.Requests, resourceGPU)
}
if res, found := resources.Limits[resourceGPU]; found {
resources.Limits[ResourceNvidiaGPU] = res
delete(resources.Requests, resourceGPU)
}
return &resources
}
// Returns a K8s Container for the execution
func ToK8sContainer(ctx context.Context, taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskContainer *core.Container,
inputReader io.InputReader, outputPaths io.OutputFilePaths) (*v1.Container, error) {
modifiedCommand, err := utils.ReplaceTemplateCommandArgs(ctx, taskContainer.GetCommand(), inputReader, outputPaths)
if err != nil {
return nil, err
}
modifiedArgs, err := utils.ReplaceTemplateCommandArgs(ctx, taskContainer.GetArgs(), inputReader, outputPaths)
if err != nil {
return nil, err
}
envVars := DecorateEnvVars(ctx, ToK8sEnvVar(taskContainer.GetEnv()), taskExecutionMetadata.GetTaskExecutionID())
if taskExecutionMetadata.GetOverrides() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
}
if taskExecutionMetadata.GetOverrides() == nil || taskExecutionMetadata.GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}
res := taskExecutionMetadata.GetOverrides().GetResources()
if res != nil {
res = ApplyResourceOverrides(ctx, *res)
}
// Make the container name the same as the pod name, unless it violates K8s naming conventions
// Container names are subject to the DNS-1123 standard
containerName := taskExecutionMetadata.GetTaskExecutionID().GetGeneratedName()
if !isAcceptableK8sName.MatchString(containerName) || len(containerName) > 63 {
containerName = rand.String(4)
}
c := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Args: modifiedArgs,
Command: modifiedCommand,
Env: envVars,
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
}
if res != nil {
c.Resources = *res
}
return c, nil
}