This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathqueues.go
117 lines (99 loc) · 3.57 KB
/
queues.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package executions
import (
"context"
"math/rand"
"github.com/lyft/flyteadmin/pkg/manager/impl/resources"
"github.com/lyft/flyteadmin/pkg/manager/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
)
type tag = string
type singleQueueConfiguration struct {
DynamicQueue string
}
type queues = []singleQueueConfiguration
type queueConfig = map[tag]queues
type QueueAllocator interface {
GetQueue(ctx context.Context, identifier core.Identifier) singleQueueConfiguration
}
type queueAllocatorImpl struct {
queueConfigMap queueConfig
config runtimeInterfaces.Configuration
db repositories.RepositoryInterface
resourceManager interfaces.ResourceInterface
}
func (q *queueAllocatorImpl) refreshExecutionQueues(executionQueues []runtimeInterfaces.ExecutionQueue) {
logger.Debug(context.Background(), "refreshing execution queues")
var queueConfigMap = make(queueConfig)
for _, queue := range executionQueues {
for _, tag := range queue.Attributes {
queuesForTag, ok := queueConfigMap[tag]
if !ok {
queuesForTag = make(queues, 0, 1)
}
queueConfigMap[tag] = append(queuesForTag, singleQueueConfiguration{
DynamicQueue: queue.Dynamic,
})
}
}
q.queueConfigMap = queueConfigMap
}
func (q *queueAllocatorImpl) GetQueue(ctx context.Context, identifier core.Identifier) singleQueueConfiguration {
// NOTE: If refreshing the execution queues & workflow configs on every call to GetQueue becomes too slow we should
// investigate caching the computed queue assignments.
executionQueues := q.config.QueueConfiguration().GetExecutionQueues()
q.refreshExecutionQueues(executionQueues)
resource, err := q.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: identifier.Project,
Domain: identifier.Domain,
Workflow: identifier.Name,
ResourceType: admin.MatchableResource_EXECUTION_QUEUE,
})
if err != nil {
logger.Warningf(ctx, "Failed to fetch override values when assigning execution queue for [%+v] with err: %v",
identifier, err)
}
if resource != nil && resource.Attributes != nil && resource.Attributes.GetExecutionQueueAttributes() != nil {
for _, tag := range resource.Attributes.GetExecutionQueueAttributes().Tags {
matches, ok := q.queueConfigMap[tag]
if !ok {
continue
}
return matches[rand.Intn(len(matches))]
}
}
var tags []string
var defaultTags []string
// If we've made it this far, check to see if a domain-specific default workflow config exists for this particular domain.
for _, workflowConfig := range q.config.QueueConfiguration().GetWorkflowConfigs() {
if workflowConfig.Domain == identifier.Domain {
tags = workflowConfig.Tags
} else if len(workflowConfig.Domain) == 0 {
defaultTags = workflowConfig.Tags
}
}
if len(tags) == 0 {
// Use the uber-default queue
tags = defaultTags
}
for _, tag := range tags {
matches, ok := q.queueConfigMap[tag]
if !ok {
continue
}
return matches[rand.Intn(len(matches))]
}
logger.Infof(ctx, "found no matching queue for [%+v]", identifier)
return singleQueueConfiguration{}
}
func NewQueueAllocator(config runtimeInterfaces.Configuration, db repositories.RepositoryInterface) QueueAllocator {
queueAllocator := queueAllocatorImpl{
config: config,
db: db,
resourceManager: resources.NewResourceManager(db, config.ApplicationConfiguration()),
}
return &queueAllocator
}