This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
registry.go
85 lines (69 loc) · 2.32 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package pluginmachinery
import (
"context"
"sync"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
)
type taskPluginRegistry struct {
m sync.Mutex
k8sPlugin []k8s.PluginEntry
corePlugin []core.PluginEntry
}
// A singleton variable that maintains a registry of all plugins. The framework uses this to access all plugins
var pluginRegistry = &taskPluginRegistry{}
func PluginRegistry() TaskPluginRegistry {
return pluginRegistry
}
// Use this method to register Kubernetes Plugins
func (p *taskPluginRegistry) RegisterK8sPlugin(info k8s.PluginEntry) {
if info.ID == "" {
logger.Panicf(context.TODO(), "ID is required attribute for k8s plugin")
}
if len(info.RegisteredTaskTypes) == 0 {
logger.Panicf(context.TODO(), "K8s Plugin should be registered to handle atleast one task type")
}
if info.Plugin == nil {
logger.Panicf(context.TODO(), "K8s Plugin cannot be nil")
}
if info.ResourceToWatch == nil {
logger.Panicf(context.TODO(), "The framework requires a K8s resource to watch, for valid plugin registration")
}
p.m.Lock()
defer p.m.Unlock()
p.k8sPlugin = append(p.k8sPlugin, info)
}
// Use this method to register core plugins
func (p *taskPluginRegistry) RegisterCorePlugin(info core.PluginEntry) {
if info.ID == "" {
logger.Panicf(context.TODO(), "ID is required attribute for k8s plugin")
}
if len(info.RegisteredTaskTypes) == 0 {
logger.Panicf(context.TODO(), "Plugin should be registered to handle atleast one task type")
}
if info.LoadPlugin == nil {
logger.Panicf(context.TODO(), "PluginLoader cannot be nil")
}
p.m.Lock()
defer p.m.Unlock()
p.corePlugin = append(p.corePlugin, info)
}
// Returns a snapshot of all the registered core plugins.
func (p *taskPluginRegistry) GetCorePlugins() []core.PluginEntry {
p.m.Lock()
defer p.m.Unlock()
return append(p.corePlugin[:0:0], p.corePlugin...)
}
// Returns a snapshot of all registered K8s plugins
func (p *taskPluginRegistry) GetK8sPlugins() []k8s.PluginEntry {
p.m.Lock()
defer p.m.Unlock()
return append(p.k8sPlugin[:0:0], p.k8sPlugin...)
}
type TaskPluginRegistry interface {
RegisterK8sPlugin(info k8s.PluginEntry)
RegisterCorePlugin(info core.PluginEntry)
GetCorePlugins() []core.PluginEntry
GetK8sPlugins() []k8s.PluginEntry
}