This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
resourcemanager.go
130 lines (108 loc) · 5.22 KB
/
resourcemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package resourcemanager
import (
"context"
"fmt"
"sync"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/promutils"
)
type TokenPrefix string
type Token string
const execUrnPrefix = "ex"
const execUrnSeparator = ":"
const tokenNamespaceSeparator = "-"
// Prepending the prefix to the actual token string using '-' separator.
// The output is of type Token
func (t Token) prepend(prefix TokenPrefix) Token {
return Token(fmt.Sprintf("%s%s%s", prefix, tokenNamespaceSeparator, t))
}
// Extending the prefix using ':' separator. The output is still of type TokenPrefix
func (t TokenPrefix) extend(prefixPart string) TokenPrefix {
return TokenPrefix(fmt.Sprintf("%s%s%s", t, execUrnSeparator, prefixPart))
}
func composeProjectScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return TokenPrefix(execUrnPrefix).extend(id.GetNodeExecutionId().GetExecutionId().GetProject())
}
func composeNamespaceScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeProjectScopePrefix(id).extend(id.GetNodeExecutionId().GetExecutionId().GetDomain())
}
func composeExecutionScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeNamespaceScopePrefix(id).extend(id.GetNodeExecutionId().GetExecutionId().GetName())
}
func ComposeTokenPrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeExecutionScopePrefix(id) // Token prefix is a required part of the token. We leverage the prefix to achieve project-level and namespace-level capping
}
// This struct is designed to serve as the identifier of an user of resource manager
type Resource struct {
quota BaseResourceConstraint
metrics Metrics
rejectedTokens sync.Map
}
type Metrics interface {
GetScope() promutils.Scope
}
type Builder interface {
GetID() string
GetResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar
BuildResourceManager(ctx context.Context) (BaseResourceManager, error)
}
// A proxy will be created for each TaskExecutionContext.
// The Proxy of an execution contains the resource namespace prefix (e.g., "qubole-hive-executor" for a hive task)
// and the token prefix (e.g., ex:<project>:<domain>:<exec_id>) of that execution.
// The plugins will only have access to a Proxy but not directly the underlying resource manager.
// The Proxy will prepend proper prefixes for the resource namespace and the allocation token.
type Proxy struct {
// pluginCore.ResourceManager
BaseResourceManager
ResourceNamespacePrefix pluginCore.ResourceNamespace
ExecutionIdentifier *core.TaskExecutionIdentifier
}
func (p Proxy) ComposeResourceConstraint(spec pluginCore.ResourceConstraintsSpec) []FullyQualifiedResourceConstraint {
composedResourceConstraintList := make([]FullyQualifiedResourceConstraint, 0)
if spec.ProjectScopeResourceConstraint != nil {
composedResourceConstraintList = append(composedResourceConstraintList, composeFullyQualifiedProjectScopeResourceConstraint(spec, p.ExecutionIdentifier))
}
if spec.NamespaceScopeResourceConstraint != nil {
composedResourceConstraintList = append(composedResourceConstraintList, composeFullyQualifiedNamespaceScopeResourceConstraint(spec, p.ExecutionIdentifier))
}
return composedResourceConstraintList
}
func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace,
allocationToken string, constraintsSpec pluginCore.ResourceConstraintsSpec) (pluginCore.AllocationStatus, error) {
composedResourceConstraintList := p.ComposeResourceConstraint(constraintsSpec)
status, err := p.BaseResourceManager.AllocateResource(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)),
composedResourceConstraintList)
return status, err
}
func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.ResourceNamespace,
allocationToken string) error {
err := p.BaseResourceManager.ReleaseResource(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)))
return err
}
func GetTaskResourceManager(r BaseResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace,
id *core.TaskExecutionIdentifier) pluginCore.ResourceManager {
return Proxy{
BaseResourceManager: r,
ResourceNamespacePrefix: resourceNamespacePrefix,
ExecutionIdentifier: id,
}
}
// The Proxy will prepend a proper prefix for the resource namespace.
type ResourceRegistrarProxy struct {
pluginCore.ResourceRegistrar
ResourceNamespacePrefix pluginCore.ResourceNamespace
}
func (p ResourceRegistrarProxy) RegisterResourceQuota(ctx context.Context, namespace pluginCore.ResourceNamespace, quota int) error {
return p.ResourceRegistrar.RegisterResourceQuota(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace), quota)
}
type BaseResourceManager interface {
GetID() string
AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken Token, constraints []FullyQualifiedResourceConstraint) (pluginCore.AllocationStatus, error)
ReleaseResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken Token) error
}